"""Distributions used in experiments and simulations. The module provides both empirical and theoretical distributions to reproduce specific spectra of momenta."""
from __future__ import annotations
from typing import TYPE_CHECKING, overload
from warnings import warn
import numpy as np
from scipy.integrate import quad, solve_ivp
from scipy.stats import gaussian_kde
from skimage.transform import resize
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Literal
from jaxtyping import Float, Integer
from numpy.random import Generator
from yacs.config import CfgNode
# Dummy variables for jaxtyping to prevent Ruff F821 errors.
# Defined strictly within TYPE_CHECKING so they cannot exist at runtime,
# ensuring they never overwrite or conflict with actual code variables.
n_samples: int = 1000
n_vars: int = 500
n_samples_sig: int = 1000
n_vars_sig: int = 500
n_steps: int = 100
n_spikes: int = 10
class Distribution:
"""An abstract base class for distributions."""
def __init__(self) -> None:
"""Initialize the distribution."""
# Parameters (set to placeholder values)
#
# - m^2 : inverse of the largest eigenvalue (mass)
# - l_- : lowest eigenvalue
# - l_+ : largest eigenvalue
self.m2: float = np.inf
self.lminus: float = -np.inf
self.lplus: float = np.inf
@overload
def pdf(self, x: float) -> float: ...
@overload
def pdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
def pdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the value of the *Probability Density Function* (PDF) of a random variable :math:`X` at given value(s) :math:`x`.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the PDF (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the PDF at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
raise NotImplementedError(
"All sublasses of Distribution must implement a custom `pdf` method!",
)
@overload
def cdf(self, x: float) -> float: ...
@overload
def cdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
def cdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the value of the *Cumulative Distribution Function* (CDF) of a random variable :math:`X` at given value(s) :math:`x`.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the CDF (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the CDF at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
raise NotImplementedError(
"All sublasses of Distribution must implement a custom `cdf` method!",
)
@overload
def dpdf(self, x: float, eps: float = 1.0e-6) -> float: ...
@overload
def dpdf(
self,
x: Float[np.ndarray, n_samples],
eps: float = 1.0e-6,
) -> Float[np.ndarray, n_samples]: ...
def dpdf(
self,
x: float | Float[np.ndarray, n_samples],
eps: float = 1.0e-6,
) -> float | Float[np.ndarray, n_samples]:
r"""Compute the derivative of the PDF of a random variable :math:`X` at given value(s) :math:`x`.
The derivative is computed using the finite difference method:
.. math::
f'(x) \approx \frac{f(x + \epsilon) - f(x - \epsilon)}{2 \epsilon}
Parameters
----------
x : float or ndarray of floats
The point(s) at which to compute the derivative of the PDF (shape: scalar or array of shape :math:`n`).
eps : float
The incremental ratio :math:`\epsilon`, by default :math:`10^{-9}`.
Returns
-------
float or ndarray of floats
The value(s) of the derivative of the PDF (shape: scalar or array of shape :math:`n`).
"""
if not isinstance(x, float):
return np.vectorize(self.dpdf, otypes=[np.float64])(x)
return self._diff(self.pdf, x, eps)
@overload
def ipdf(self, x: float) -> float: ...
@overload
def ipdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
def ipdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the PDF of the inverse of the random variable :math:`X` (i.e. :math:`Y = 1 / X`) at given value(s) :math:`x`.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the PDF of the inverse variable (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the PDF of the inverse variable at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
# If x is not a scalar, then vectorize the function
if not isinstance(x, float):
return np.vectorize(self.ipdf, otypes=[np.float64])(x)
# In order to reproduce the momentum distribution, we need to consider
# a change of variable:
#
# 1) we consider lambda' = lambda - self.lminus,
#
# where lambda is an eigenvalue.
#
# This first change is to ensure that the distribution is shifted to
# the origin (i.e. the lowest eigenvalue is zero).
#
# We then consider the change of variable:
#
# 2) lambda' = 1 / (k^2 + m^2),
#
# in order to switch to momenta (shifted by the mass m^2, which is the
# inverse of the largest eigenvalue).
return self.pdf(1 / (x + self.m2) + self.lminus) / (x + self.m2) ** 2
@overload
def lnipdf(self, x: float) -> float: ...
@overload
def lnipdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
def lnipdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the natural log-PDF of the inverse of the random variable :math:`X` (i.e. :math:`Y = 1 / X`) at given value(s) :math:`x`.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the log-PDF of the inverse variable (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the log-PDF of the inverse variable at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
if not isinstance(x, float):
return np.vectorize(self.lnipdf, otypes=[np.float64])(x)
return float(np.log(self.ipdf(x)))
@overload
def icdf(self, x: float) -> float: ...
@overload
def icdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
def icdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the CDF of the inverse of the random variable :math:`X` (i.e. :math:`Y = 1 / X`) at given value(s) :math:`x`.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the CDF of the inverse variable (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the CDF of the inverse variable at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
if not isinstance(x, float):
return np.vectorize(self.icdf, otypes=[np.float64])(x)
if x <= 0.0:
return 0.0
return quad(self.ipdf, 0.0, x)[0]
@overload
def dipdf(self, x: float) -> float: ...
@overload
def dipdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
def dipdf(
self,
x: float | Float[np.ndarray, n_samples],
eps: float = 1.0e-6,
) -> float | Float[np.ndarray, n_samples]:
r"""Compute the derivative of the PDF of the inverse of the random variable :math:`X` (i.e. :math:`Y = 1 / X`) at given value(s) :math:`x`.
The derivative is computed using a central difference method:
.. math::
f'(x) \approx \frac{f(x + \epsilon) - f(x - \epsilon)}{2 \epsilon}
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the derivative of the inverse PDF (shape: scalar or array of shape :math:`n`).
eps : float
The incremental ratio :math:`\epsilon`, by default :math:`10^{-9}`.
Returns
-------
float or ndarray of floats
The value(s) of the derivative of the inverse PDF at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
if not isinstance(x, float):
return np.vectorize(self.dipdf, otypes=[np.float64])(x)
return self._diff(self.ipdf, x, eps)
@overload
def dlnipdf(self, x: float) -> float: ...
@overload
def dlnipdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
def dlnipdf(
self,
x: float | Float[np.ndarray, n_samples],
eps: float = 1.0e-6,
) -> float | Float[np.ndarray, n_samples]:
r"""Compute the derivative of the natural log-PDF of the inverse of the random variable :math:`X` (i.e. :math:`Y = 1 / X`) at given value(s) :math:`x`.
The derivative is computed using a central difference method:
.. math::
f'(x) \approx \frac{f(x + \epsilon) - f(x - \epsilon)}{2 \epsilon}
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the derivative of the natural log-inverse PDF (shape: scalar or array of shape :math:`n`).
eps : float
The incremental ratio :math:`\epsilon`, by default :math:`10^{-9}`.
Returns
-------
float or ndarray of floats
The value(s) of the derivative of the natural log-inverse PDF at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
if not isinstance(x, float):
return np.vectorize(self.dlnipdf, otypes=[np.float64])(x)
return self._diff(self.lnipdf, x, eps)
def _diff(
self,
func: Callable[[float], float],
x: float,
eps: float = 1.0e-6,
) -> float:
r"""Compute a derivative of a function using a central difference method.
Notes
-----
The formula used is a centered difference method:
.. math::
f'(x) \approx \frac{f(x + \epsilon) - f(x - \epsilon)}{2 \epsilon}
Parameters
----------
func: Callable
The function to evaluate.
x : float
The value at which to evaluate the derivative.
eps : float
The step to compute the incremental ratio, by default :math:`10^{-9}`.
Returns
-------
float
The value of the derivative of the PDF at the given value.
"""
num: float = func(x + eps) - func(x - eps)
den: float = 2.0 * eps
return num / den if den != 0.0 else 0.0
@overload
def canonical_dimensions(self, x: float) -> Float[np.ndarray, 4]: ...
@overload
def canonical_dimensions(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples, 4]: ...
def canonical_dimensions(
self,
x: float | Float[np.ndarray, n_samples],
) -> Float[np.ndarray, 4] | Float[np.ndarray, n_samples, 4]:
r"""Compute the canonical dimensions of the distribution.
Notes
-----
The method uses the following formulae:
.. math::
\text{dim}_{u_2}(k^2) = \frac{\int^{k^2}_0 \mathrm{d}p^2 \rho(p^2)}{k^2 \rho(k^2)}
.. math::
\text{dim}_{u_4}(k^2) = \text{dim}_{u_2}(k^2) \left( 3 + k^2 \frac{\partial \ln \rho(k^2)}{\partial k^2} - 2 \right)
.. math::
\text{dim}_{u_6}(k^2) = -\text{dim}_{u_2}(k^2) + 2 \cdot \text{dim}_{u_4}(k^2)
.. math::
\text{dim}_{\chi}(k^2)= 1 - \text{dim}_{u_2}(k^2)
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the canonical dimensions (shape: scalar or array of shape :math:`n`).
Returns
-------
ndarray of floats
An array containing, in columns (shape: :math:`4`, if input is scalar, or :math:`n \times 4`, if input is array):
- ``dim_u2``: the mass term
- ``dim_u4``: the quartic interaction
- ``dim_u6``: the sextic interaction
- ``dim_chi``: the dimension of the field
"""
if not isinstance(x, float):
return np.vectorize(
self.canonical_dimensions,
otypes=[np.float64],
signature="()->(n)",
)(x)
# Ignore the warning of zero value division and compute the dimensions
with np.errstate(divide="ignore", invalid="ignore"):
dimu2: float = self.icdf(x) / self.ipdf(x) / x
dimu4: float = dimu2 * (3.0 + x * self.dlnipdf(x)) - 2.0
dimu6: float = -dimu2 + 2.0 * dimu4
dimchi: float = 1.0 - dimu2
return np.array([dimu2, dimu4, dimu6, dimchi])
def _frg_equations_single(
self,
x: float,
u2: float,
u4: float,
u6: float,
dx: float = 1.0e-9,
return_rhs: bool = False,
) -> Float[np.ndarray, 3]:
"""Compute the FRG equations at a given point.
Parameters
----------
x : float
The momentum scale :math:`k^2` at which to start computing the FRG equations. This represents an energy scale in the UV region. The FRG computation ends in the IR region.
u2 : float
The initial value of the coupling :math:`u_2` at energy scale :math:`x`.
u4 : float
The initial value of the coupling :math:`u_4` at energy scale :math:`x`.
u6 : float
The initial value of the coupling :math:`u_6` at energy scale :math:`x`.
dx : float
The step size, by default ``1.0e-9``.
return_rhs : bool
Whether to return the right-hand side of the FRG equations instead of updating the couplings, by default False.
Returns
-------
ndarray of floats
An array containing, in columns (shape: :math:`3`):
- ``u2``: the running of the coupling :math:`u_2`
- ``u4``: the running of the coupling :math:`u_4`
- ``u6``: the running of the coupling :math:`u_6`
"""
# Compute the prefactor
pref: float = self.ipdf(x) / self.icdf(x)
# Canonical dimensions
dimu2: float
dimu4: float
dimu6: float
dimu2, dimu4, dimu6, _ = self.canonical_dimensions(x).T
# Right Hand Sides
u2_rhs: float = -dimu2 * u2 - 2.0 * u4 / (1.0 + u2) ** 2
u4_rhs: float = (
-dimu4 * u4
- 2.0 * u6 / (1.0 + u2) ** 2
+ 12.0 * u4**2 / (1.0 + u2) ** 3
)
u6_rhs: float = (
-dimu6 * u6
+ 60.0 * u4 * u6 / (1.0 + u2) ** 3
- 108.0 * u6**3 / (1.0 + u2) ** 4
)
# Return the RHS if requested by the solver
if return_rhs:
return pref * np.array([u2_rhs, u4_rhs, u6_rhs])
# Equations (the - sign is due to the flow towards the IR)
u2 -= dx * pref * u2_rhs
u4 -= dx * pref * u4_rhs
u6 -= dx * pref * u6_rhs
return np.array([u2, u4, u6])
def frg_equations(
self,
x: float,
u2_init: float,
u4_init: float,
u6_init: float,
dx: float = 1.0e-9,
x_ir: float = 0.0,
use_naive_euler: bool = True,
) -> Float[np.ndarray, n_steps, 4]:
r"""Compute the FRG equations.
Given initial conditions of the couplings :math:`u_2`, :math:`u_4`, and :math:`u_6` at a given momentum scale :math:`k^2`, the function returns the values of the couplings at the scale :math:`k^2 - \Delta_k`. That is, the function returns the running of the couplings from the UV region towards the IR (:math:`k^2 \to 0`):
.. math::
\dot{u}_2 = - \text{dim}_{u_2} u_2 - 2 \frac{u_4}{(1 + u_2)^2}
.. math::
\dot{u}_4 = - \text{dim}_{u_4} u_4 - 2 \frac{u_6}{(1 + u_2)^2} + 12 \frac{u_4^2}{(1 + u_2)^3}
.. math::
\dot{u}_6 = - \text{dim}_{u_6} u_6 + 60 \frac{u_4 u_6}{(1 + u_2)^3} - 108 \frac{u_6^3}{(1 + u_2)^4}
Parameters
----------
x : float
The momentum scale :math:`k^2` at which to start computing the FRG equations. This represents an energy scale in the UV region. The FRG computation ends in the IR region.
u2_init : float
The initial value of the coupling :math:`u_2` at energy scale :math:`x`.
u4_init : float
The initial value of the coupling :math:`u_4` at energy scale :math:`x`.
u6_init : float
The initial value of the coupling :math:`u_6` at energy scale :math:`x`.
dx : float
The step size, by default ``1.0e-9``.
x_ir : float
The definition of IR scale (i.e. the scale :math:`k^2` to be considered low-energy), by default 0.0. It can be overridden to match physical assumptions (e.g. histogram binning, etc.).
use_naive_euler : bool
Whether to use the naive Euler method for integration, by default True. If False, a more sophisticated integration method will be used.
Returns
-------
ndarray of floats
An array containing, in columns (shape: :math:`S \times 4` where :math:`S` is the number of steps):
- ``k2``: the energy scale corresponding to the couplings
- ``u2``: the running of the coupling :math:`u_2`
- ``u4``: the running of the coupling :math:`u_4`
- ``u6``: the running of the coupling :math:`u_6`
The number of computed steps is:
.. math::
S = \lfloor \frac{x - x_{\text{IR}}}{\Delta x} \rfloor
"""
# Compute the energy scale
k2: float
u2: float
u4: float
u6: float
k2, u2, u4, u6 = x, u2_init, u4_init, u6_init
if use_naive_euler:
results = []
while k2 >= max(dx, x_ir):
u2, u4, u6 = self._frg_equations_single(
x=k2,
u2=u2,
u4=u4,
u6=u6,
dx=dx,
return_rhs=not use_naive_euler,
)
results.append([k2 - dx, u2, u4, u6])
k2 -= dx
else:
# Define a closure with the signature needed by the solver
def flow_derivative(
t: float,
y: Float[np.ndarray, 3],
) -> Float[np.ndarray, 3]:
u2: float
u4: float
u6: float
u2, u4, u6 = y
return self._frg_equations_single(
x=t,
u2=u2,
u4=u4,
u6=u6,
dx=dx,
return_rhs=True,
)
sol = solve_ivp(
fun=flow_derivative,
t_span=(x, x_ir), # UV to IR
y0=[u2, u4, u6],
method="Radau",
dense_output=True,
)
k2_list: np.ndarray = sol.t
u2_list: np.ndarray
u4_list: np.ndarray
u6_list: np.ndarray
u2_list, u4_list, u6_list = sol.y
results = np.column_stack((k2_list, u2_list, u4_list, u6_list))
return np.array(results)
def _frg_equations_lpa_single(
self,
x: float,
kappa: float,
u4: float,
u6: float,
dx: float = 1.0e-9,
return_rhs: bool = False,
) -> Float[np.ndarray, 3]:
r"""Compute the FRG equations (LPA) at a given point.
Parameters
----------
x : float
The momentum scale :math:`k^2` at which to start computing the FRG equations. This represents an energy scale in the UV region. The FRG computation ends in the IR region.
kappa : float
The initial value of the position of the field :math:`\chi` at energy scale :math:`x`.
u4 : float
The initial value of the coupling :math:`u_4` at energy scale :math:`x`.
u6 : float
The initial value of the coupling :math:`u_6` at energy scale :math:`x`.
dx : float
The step size, by default ``1.0e-9``.
return_rhs : bool
Whether to return the right-hand side of the FRG equations instead of updating the couplings, by default False.
Returns
-------
ndarray of floats
An array containing, in columns (shape: :math:`3`):
- ``kappa``: the running of the position of the zero :math:`\kappa`
- ``u4``: the running of the coupling :math:`u_4`
- ``u6``: the running of the coupling :math:`u_6`
"""
# Compute the prefactor
pref: float = self.ipdf(x) / self.icdf(x)
# Canonical dimensions
dimu4: float
dimu6: float
dimchi: float
_, dimu4, dimu6, dimchi = self.canonical_dimensions(x).T
# Right Hand Sides
kappa_rhs: float = (
-dimchi * kappa
+ 2.0
* (3.0 + 2.0 * kappa * u6 / u4)
/ (1.0 + 2.0 * kappa * u4) ** 2
)
u4_rhs: float = (
-dimu4 * u4
+ dimchi * kappa * u6
- 10.0 * u6 / (1.0 + 2.0 * kappa * u4) ** 2
+ 4.0
* (3.0 * u4 + 2.0 * kappa * u6) ** 2
/ (1.0 + 2.0 * kappa * u4) ** 3
)
u6_rhs: float = (
-dimu6 * u6
- 12.0
* (3.0 * u4 + 2.0 * kappa * u6) ** 3
/ (1.0 + 2.0 * kappa * u4) ** 4
+ 40.0
* u6
* (3.0 * u4 + 2.0 * kappa * u6)
/ (1.0 + 2.0 * kappa * u4)
)
# Return the continuous physical derivative for SciPy
if return_rhs:
return pref * np.array([kappa_rhs, u4_rhs, u6_rhs])
# Equations (the - sign indicates the direction of the flow to IR)
kappa -= dx * pref * kappa_rhs
u4 -= dx * pref * u4_rhs
u6 -= dx * pref * u6_rhs
return np.array([kappa, u4, u6])
def frg_equations_lpa(
self,
x: float,
kappa_init: float,
u4_init: float,
u6_init: float,
dx: float = 1.0e-9,
x_ir: float = 0.0,
use_naive_euler: bool = True,
) -> Float[np.ndarray, n_steps, 4]:
r"""Compute the FRG equations (LPA).
Given initial conditions of the zero of the potential :math:`\kappa` and the couplings :math:`u_2`, and :math:`u_4` at a given momentum scale :math:`k^2`, the function returns the values of the couplings at the scale :math:`k^2 - \Delta_k` in **Local Potential Approximation** (LPA). That is, the function returns the running of the couplings from the UV region towards the IR (:math:`k^2 \to 0`):
.. math::
\dot{\kappa} = - \text{dim}_{\chi} \kappa + 2 \frac{3 + 2 \kappa \frac{u_6}{u_4}}{(1 + 2 \kappa u_4)^2}
.. math::
\dot{u}_4 = - \text{dim}_{u_4} u_4 + \text{dim}_{\chi} \kappa u_6 - 10 \frac{u_6}{(1 + 2 \kappa u_4)^2} + 4 \frac{(3 u_4 + 2 \kappa u_6)^2}{(1 + 2 \kappa u_4)^3}
.. math::
\dot{u}_6 = - \text{dim}_{u_6} u_6 - 12 \frac{(3 u_4 + 2 \kappa u_6)^3}{(1 + 2 \kappa u_4)^4} + 40 \frac{u_6 (3 u_4 + 2 \kappa u_6)}{(1 + 2 \kappa u_4)^2}
Parameters
----------
x : float
The momentum scale :math:`k^2` at which to start computing the FRG equations. This represents an energy scale in the UV region. The FRG computation ends in the IR region.
kappa_init : float
The initial value of the position of the zero of the potential :math:`\kappa` at energy scale :math:`x`.
u4_init : float
The initial value of the coupling :math:`u_4` at energy scale :math:`x`.
u6_init : float
The initial value of the coupling :math:`u_6` at energy scale :math:`x`.
dx : float
The step size, by default 1.0e-9.
x_ir : float
The definition of IR scale (i.e. the scale :math:`k^2` to be considered low-energy), by default 0.0. It can be overridden to match physical assumptions (e.g. histogram binning, etc.).
use_naive_euler : bool
Whether to use the naive Euler method for integration, by default True. If False, a more sophisticated integration method will be used.
Returns
-------
ndarray of floats
An array containing, in columns (shape: :math:`S \times 4` where :math:`S` is the number of steps):
- ``k2``: the energy scale corresponding to the couplings
- ``kappa``: the running of the zero of the potential :math:`\kappa`
- ``u4``: the running of the coupling :math:`u_4`
- ``u6``: the running of the coupling :math:`u_6`
The number of computed steps is:
.. math::
S = \lfloor \frac{x - x_{\text{IR}}}{\Delta x} \rfloor
"""
# Compute the energy scale
k2: float
kappa: float
u4: float
u6: float
k2, kappa, u4, u6 = x, kappa_init, u4_init, u6_init
if use_naive_euler:
results = []
while k2 >= max(dx, x_ir):
kappa, u4, u6 = self._frg_equations_lpa_single(
x=k2,
kappa=kappa,
u4=u4,
u6=u6,
dx=dx,
return_rhs=not use_naive_euler,
)
results.append([k2 - dx, kappa, u4, u6])
k2 -= dx
else:
# Define a closure with the signature needed by the SciPy solver
def flow_derivative(
t: float,
y: Float[np.ndarray, 3],
) -> Float[np.ndarray, 3]:
kappa_val: float
u4_val: float
u6_val: float
kappa_val, u4_val, u6_val = y
return self._frg_equations_lpa_single(
x=t,
kappa=kappa_val,
u4=u4_val,
u6=u6_val,
dx=dx,
return_rhs=True,
)
sol = solve_ivp(
fun=flow_derivative,
t_span=(x, x_ir), # UV to IR
y0=[kappa, u4, u6],
method="Radau",
dense_output=True,
)
k2_list: np.ndarray = sol.t
kappa_list: np.ndarray
u4_list: np.ndarray
u6_list: np.ndarray
kappa_list, u4_list, u6_list = sol.y
results = np.column_stack((k2_list, kappa_list, u4_list, u6_list))
return np.array(results)
[docs]
class EmpiricalDistribution(Distribution):
"""The empirical distribution uses a **Kernel Density Estimation** (KDE) to fit the spectrum of the theory and analyse it."""
def __init__(
self,
n_samples: int,
var: float = 1.0,
ratio: float = 0.5,
is_poisson: bool = False,
poisson_data: bool = False,
poisson_lam: float = 10.0,
poisson_centering: Literal[
"centered",
"non-centered",
"mirrored",
] = "centered",
seed: int = 42,
):
r"""Initialize the empirical distribution.
Parameters
----------
n_samples : int
The number of samples in the distribution.
var : float
The variance of the distribution.
ratio : float
The ratio between the number of variables and the dimension of the sample/population.
.. note::
Let :math:`X \in \mathbb{R}^{n \times p}` be a real matrix where samples are **row** vectors, then the ``ratio`` parameter is :math:`\frac{p}{n}`.
is_poisson : bool
Add Poisson noise (e.g. photon counting), by default ``False``.
poisson_data : bool
Whether to add Poisson noise directly to the data (if ``True``) or to the covariance matrix (if ``False``), by default ``False``.
poisson_lam : float
The rate parameter for the Poisson distribution, by default ``10.0``.
poisson_centering : Literal["centered", "non-centered", "mirrored"]
The centering method for Poisson noise, by default ``"centered"``. Must be one of:
- "centered": Center the Poisson noise by removing the expected value from the generated sample.
- "non-centered": Do not center the Poisson noise.
- "mirrored": Mirror the Poisson noise by generating two samples and subtracting them (Skellam).
seed : int
The random seed, by default ``42``.
Warns
-----
UserWarning
If the size of the sample is less than 1000 (sample too small)
"""
super().__init__()
self._validate_params(
n_samples,
var,
ratio,
poisson_lam,
poisson_centering,
)
self.n_samples: int = int(n_samples)
self.var: float = float(var)
self.ratio: float = float(ratio)
self.n_vars: int = int(n_samples * ratio)
self.is_poisson: bool = bool(is_poisson)
self.poisson_data: bool = bool(poisson_data)
self.poisson_lam: float = float(poisson_lam)
self.poisson_centering: Literal[
"centered",
"non-centered",
"mirrored",
] = poisson_centering
self.seed: int = int(seed)
self._fitted: bool = False
self._iscov: bool = False
# Generate the background distribution
gen: Generator = np.random.default_rng(self.seed)
self._noise: Float[np.ndarray, n_samples, n_vars] = (
self._generate_base_noise(gen)
)
# Handle Poisson noise
if self.is_poisson:
self._handle_poisson_noise(gen)
# Prepare data
self.data: Float[np.ndarray, n_samples, n_vars] = self._noise
self.eigenvectors_: Float[np.ndarray, n_samples, n_vars] | None = None
self.eigenvalues_: Float[np.ndarray, n_vars] | None = None
def _validate_params(
self,
n_samples: int,
var: float,
ratio: float,
poisson_lam: float,
poisson_centering: str,
) -> None:
"""Validate the input parameters.
Raises
------
ValueError
If the size of the sample is less than 2.
ValueError
If variance is negative.
ValueError
If ratio is not strictly positive.
ValueError
If the Poisson rate parameter is negative.
ValueError
If poisson_centering is not one of "centered", "non-centered", or "mirrored".
"""
if n_samples < 2:
raise ValueError(
"The number of samples must be a large number but got %d < 2!"
% n_samples,
)
if n_samples < 1000:
warn(
"The number of samples is low (%d) and the results might suffer from numerical instabilities"
% n_samples,
category=UserWarning,
)
if var < 0.0:
raise ValueError(
"The variance must be non-negative but got %f < 0" % var,
)
if ratio <= 0.0:
raise ValueError(
"Ratio must be strictly positive but got %f <= 0" % ratio,
)
if poisson_lam < 0.0:
raise ValueError(
"The rate parameter for the Poisson distribution must be non-negative but got %f < 0"
% poisson_lam,
)
if poisson_centering not in ["centered", "non-centered", "mirrored"]:
raise ValueError(
"poisson_centering must be one of 'centered', 'non-centered', or 'mirrored' but got %s"
% poisson_centering,
)
def _generate_base_noise(
self,
gen: Generator,
) -> Float[np.ndarray, n_samples, n_vars]:
r"""Generate the base background noise.
Returns
-------
ndarray of floats
The generated noise matrix (shape: :math:`n \times p`).
"""
if self.var > 0.0:
return gen.normal(
loc=0.0,
scale=np.sqrt(self.var),
size=(self.n_samples, self.n_vars),
)
return np.zeros((self.n_samples, self.n_vars))
def _handle_poisson_noise(self, gen: Generator) -> None:
"""Handle the addition of Poisson noise."""
# Case 1: Poisson noise is added directly to the data
if self.poisson_data:
self._noise = self._generate_poisson_samples(gen)
self.var = (
self.poisson_lam
if self.poisson_centering != "mirrored"
else 2.0 * self.poisson_lam
)
# Case 2: Poisson noise is added to the covariance matrix
else:
base_noise: Float[np.ndarray, n_samples, n_vars] = (
self._generate_poisson_samples(gen)
)
self._cov_noise: Float[np.ndarray, n_vars, n_vars] = (
base_noise.T @ base_noise
) / max(1.0, float(self.n_samples - 1))
def _generate_poisson_samples(
self,
gen: Generator,
) -> Float[np.ndarray, n_samples, n_vars]:
r"""Generate Poisson noise samples based on the centering method.
Returns
-------
ndarray of floats
The generated Poisson noise matrix (shape: :math:`n \times p`).
"""
if self.poisson_centering == "non-centered":
return gen.poisson(
lam=self.poisson_lam,
size=(self.n_samples, self.n_vars),
)
if self.poisson_centering == "centered":
return (
gen.poisson(
lam=self.poisson_lam,
size=(self.n_samples, self.n_vars),
)
- self.poisson_lam
)
# mirrored
return gen.poisson(
lam=self.poisson_lam,
size=(self.n_samples, self.n_vars),
) - gen.poisson(
lam=self.poisson_lam,
size=(self.n_samples, self.n_vars),
)
@property
def data(self) -> Float[np.ndarray, n_samples, n_vars]:
r"""Return the data matrix.
Returns
-------
ndarray of floats
The data matrix (shape: :math:`n \times p`).
"""
return self._data
@data.setter
def data(self, value: Float[np.ndarray, n_samples, n_vars]) -> None:
r"""Set the data matrix.
Parameters
----------
value : ndarray of floats
The data matrix (shape: :math:`n \times p`).
Raises
------
ValueError
If the data matrix is not 2-dimensional.
"""
if value.ndim != 2:
raise ValueError(
"The data matrix must be 2-dimensional but got %d-dimensional"
% value.ndim,
)
self._data: Float[np.ndarray, n_samples, n_vars] = value
[docs]
@classmethod
def from_config(cls, cfg: CfgNode) -> EmpiricalDistribution:
"""Create an instance of the class from a configuration.
Parameters
----------
cfg : CfgNode
The configuration.
Returns
-------
EmpiricalDistribution
An instance of the class.
"""
return cls(
n_samples=cfg.DIST.NUM_SAMPLES,
var=cfg.DIST.VAR,
ratio=cfg.DIST.RATIO,
seed=cfg.DIST.SEED,
is_poisson=cfg.DIST.IS_POIS,
poisson_data=cfg.DIST.POIS_DATA,
poisson_lam=cfg.DIST.POIS_LAM,
poisson_centering=cfg.DIST.POIS_MODE,
)
[docs]
@classmethod
def from_covariance(
cls,
cov: Float[np.ndarray, n_samples, n_vars],
cfg: CfgNode,
) -> EmpiricalDistribution:
r"""Create an instance of the class from a covariance matrix.
Parameters
----------
cov : ndarray of floats
The covariance matrix (shape: :math:`n \times p`).
cfg : CfgNode
The configuration
Returns
-------
EmpiricalDistribution
An instance of the class.
"""
instance: EmpiricalDistribution = cls.from_config(cfg)
instance.data: Float[np.ndarray, n_samples, n_vars] = cov
instance._iscov = True
return instance
[docs]
def add_signal(
self,
X: Float[np.ndarray, n_samples_sig, n_vars_sig],
snr: float = 0.0,
) -> EmpiricalDistribution:
r"""Add a signal to the distribution.
Parameters
----------
X : ndarray of floats
The signal to add (shape: :math:`n_S \times p_S`). If :math:`n_S \neq n` or :math:`p_S \neq p`, the signal matrix will be resized to match the background.
snr : float
The signal-to-noise ratio, by default ``0.0``. If negative, then only signal is kept and noise is discarded.
Returns
-------
EmpiricalDistribution
A new instance of the class with the signal added.
Warns
-----
UserWarning
If the distribution was created from a covariance matrix, adding a signal may lead to unexpected results.
Raises
------
ValueError
If the signal matrix is not 2-dimensional.
"""
if self._iscov:
warn(
"You are adding a signal to a distribution created from a covariance matrix. This may lead to unexpected results.",
)
if X.ndim != 2:
raise ValueError(
"The signal matrix must be 2-dimensional but got %d-dimensional"
% X.ndim,
)
# Add the signal to the background
n: int
p: int
n, p = self.data.shape
nS: int
pS: int
nS, pS = X.shape
signal: Float[np.ndarray, n_samples_sig, n_vars_sig] = (
resize(X, output_shape=self.data.shape[:2])
if ((n != nS) or (p != pS))
else X
)
if snr >= 0.0:
self.data += snr * signal
else:
self.data: Float[np.ndarray, n_samples, n_vars] = signal
return self
def _cov_eigh(
self,
X: Float[np.ndarray, n_samples, n_vars],
is_cov: bool = False,
) -> Float[np.ndarray, n_vars]:
r"""Get the eigenvalues of the covariance matrix of the data (from the singular values of the data).
Parameters
----------
X : ndarray of floats
The data matrix (shape: :math:`n \times p`).
is_cov : bool
Data are a covariance matrix, by default `False`.
Returns
-------
ndarray of floats
The eigenvalues of the distribution (shape: :math:`p`).
"""
if is_cov:
cov: Float[np.ndarray, n_vars, n_vars] = X.copy()
else:
cov: Float[np.ndarray, n_vars, n_vars] = (X.T @ X) / max(
1.0,
float(self.n_samples - 1.0),
)
if hasattr(self, "_cov_noise"):
cov += self._cov_noise
# Compute the eigendecomposition
evl: Float[np.ndarray, n_vars]
evc: Float[np.ndarray, n_vars, n_vars]
evl, evc = np.linalg.eigh(cov)
self.eigenvalues_: Float[np.ndarray, n_vars] = evl
self.eigenvectors_: Float[np.ndarray, n_vars, n_vars] = evc
return evl
@property
def eigenvalues(self) -> Float[np.ndarray, n_vars]:
"""Compute the eigenvalues of the distribution.
Notes
-----
This is the complete list of eigenvalues (bulk + spikes). You can access the filtered distribution **without** the spikes using the ``self.eigenvalues_`` attribute, available after calling the ``self.fit(...)`` method.
Returns
-------
ndarray of floats
The eigenvalues of the distribution, sorted in ascending order (shape: :math:`p`).
"""
evl: Float[np.ndarray, n_vars] = self._cov_eigh(
self.data,
is_cov=self._iscov,
)
assert self.eigenvalues_ is not None, "Error: Eigenvalues not computed!"
assert self.eigenvectors_ is not None, (
"Error: Eigenvectors not computed!"
)
# Sort the eigenvalues
idx: Integer[np.ndarray, n_vars] = np.argsort(evl)
self.eigenvalues_ = evl[idx]
self.eigenvectors_ = self.eigenvectors_[:, idx]
return evl[idx]
[docs]
def find_spikes(
self,
eigenvalues: Float[np.ndarray, n_vars],
pow: float = 0.5,
) -> int:
"""Find the spikes in the eigenvalues.
The search for spikes is done by measuring the distance between eigenvalues from the IR to the UV. When eigenvalues start to form a continuous bulk (i.e. the difference between consecutive eigenvalues is smaller than a threshold), they are considered part of the bulk.
Parameters
----------
eigenvalues : ndarray of floats
The eigenvalues of the distribution (shape: :math:`p`).
pow : float
Continuity criterion of the eigenvalues, by default ``0.5``.
Returns
-------
int
The index of the first spike (which equals the number of eigenvalues in the bulk).
"""
dx: float = len(eigenvalues) ** (-pow)
# Find the first gap from the IR (left) that is larger than dx
diff: Float[np.ndarray, n_vars - 1] = np.diff(eigenvalues)
large_gaps: np.ndarray = np.where(diff > dx)[0]
if len(large_gaps) == 0:
# All gaps are small, the entire spectrum is considered
# the continuous bulk
return len(eigenvalues)
if len(large_gaps) == len(diff) or np.median(diff) > dx:
# All gaps are uniformly large (e.g. scaled variance)
# Only the most UV eigenvalue is considered a spike
return 1
# The bulk ends right before the first large gap
return int(large_gaps[0]) + 1
def _find_pow(self, pow: float, snr: float) -> float:
"""Interpolate a power factor as a function of the SNR to ensure to eliminate all spikes.
Parameters
----------
pow : float
The power factor.
snr : float
The actual signal-to-noise ratio.
Returns
-------
float
The spike-finding factor.
"""
coeff: float = (pow - 0.5) / 5.0
return coeff * snr + 0.5
[docs]
def fit(
self,
X: Float[np.ndarray, n_samples, n_vars] | None = None,
snr: float = 0.0,
fac: float = 0.3,
) -> EmpiricalDistribution:
r"""Add the signal (if provided) and compute the eigenvalue distribution.
Parameters
----------
X : ndarray of floats, optional
The signal to add (shape: :math:`n \times p`).
snr : float
The signal-to-noise ratio.
fac : float
Bandwidth factor. By deault `0.3`.
Returns
-------
EmpiricalDistribution
A new instance of the class with the signal added and the eigenvalue distribution computed.
"""
if X is not None:
self.add_signal(X, snr=snr)
# Remove the spikes from the eigenvalues and fit a KDE
evls: Float[np.ndarray, n_vars] = self.eigenvalues
spikes: int = self.find_spikes(
eigenvalues=evls,
pow=self._find_pow(pow=self.ratio, snr=snr),
)
assert self.eigenvalues_ is not None, "Error: Eigenvalues not computed!"
assert self.eigenvectors_ is not None, (
"Error: Eigenvectors not computed!"
)
self.eigenvalues_: Float[np.ndarray, n_vars - n_spikes] = evls[:spikes]
self.eigenvectors_: Float[np.ndarray, n_vars, n_vars - n_spikes] = (
self.eigenvectors_[:, :spikes]
)
# Avoid KDE crash if there's only 1 eigenvalue left in the bulk
if len(self.eigenvalues_) == 1:
val: float = float(self.eigenvalues_[0])
self.eigenvalues_ = np.array([val - 1.0e-5, val + 1.0e-5])
self.kde = gaussian_kde(
self.eigenvalues_,
bw_method=lambda obj: np.power(obj.n, -1.0 / (obj.d + 4.0)) * fac,
)
# Compute the min and max points
self.lminus: float = float(self.var * (1.0 - np.sqrt(self.ratio)) ** 2)
self.lplus_mp: float = float(
self.var * (1.0 + np.sqrt(self.ratio)) ** 2,
)
self.lplus: float = float(max(self.eigenvalues_))
self.m2: float = 1.0 / (self.lplus - self.lminus)
self.m2_mp: float = 1.0 / (self.lplus_mp - self.lminus)
self.norm: float = float(
self.kde.integrate_box_1d(self.lminus, self.lplus),
)
self._fitted: bool = True
return self
@overload
def pdf(self, x: float) -> float: ...
@overload
def pdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
[docs]
def pdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the value of the *Probability Density Function* (PDF) of the eigenvalue distribution at given value(s) :math:`x`.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the PDF (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the PDF at the given value(s) (shape: scalar or array of shape :math:`n`).
Raises
------
ValueError
If the distribution has not been fitted yet.
"""
if not self._fitted:
raise ValueError(
"The distribution must be fitted before calling `pdf`! Please call `self.fit()` first.",
)
if not isinstance(x, float):
return np.vectorize(self.pdf, otypes=[np.float64])(x)
if (x <= self.lminus) or (x >= self.lplus):
return 0.0
return float(self.kde.pdf(x)) / self.norm
@overload
def cdf(self, x: float) -> float: ...
@overload
def cdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
[docs]
def cdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the CDF of the eigenvalue distribution.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the CDF (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the CDF at the given value(s) (shape: scalar or array of shape :math:`n`).
Raises
------
ValueError
If the distribution has not been fitted yet.
"""
if not self._fitted:
raise ValueError(
"The distribution must be fitted before calling `cdf`! Please call `self.fit()` first.",
)
if not isinstance(x, float):
return np.vectorize(self.cdf, otypes=[np.float64])(x)
# Physical bounds
if x <= self.lminus:
return 0.0
if x >= self.lplus:
return 1.0
val: float = float(self.kde.integrate_box_1d(self.lminus, x))
return val / self.norm
@overload
def icdf(self, x: float) -> float: ...
@overload
def icdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
[docs]
def icdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the CDF of the distribution of the inverse of the eigenvalues.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the CDF of the momenta (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the CDF of the momenta (shape: scalar or array of shape :math:`n`).
Raises
------
ValueError
If the distribution has not been fitted yet.
"""
if not self._fitted:
raise ValueError(
"The distribution must be fitted before calling ipdf! Please call ``self.fit()`` first.",
)
if not isinstance(x, float):
return np.vectorize(self.icdf, otypes=[np.float64])(x)
# Let
# lambda' = lambda - lambda_-
# and
# lambda' = 1 / (k^2 + m^2) <==> k^2 = 1 / lambda' - self.m2.
#
# We can then compute the CDF of the random variable K^2 by computing:
#
# P(K^2 < k^2) = P(1 / lambda' < k^2 + m^2)
# = P(lambda' > 1 / (k^2 + m^2))
# = P(lambda > lambda_- + 1 / (k^2 + m^2))
# = 1 - P(lambda < lambda_- + 1 / (k^2 + m^2))
# = 1 - CDF(lambda_- + 1 / (k^2 + m^2))
if x <= 0.0:
return 0.0
return 1.0 - self.cdf(self.lminus + 1.0 / (x + self.m2))
[docs]
class MarchenkoPastur(Distribution):
"""The **Marchenko-Pastur** distribution represents the distribution of eigenvalues of large random covariance matrices."""
def __init__(self, ratio: float, var: float):
r"""Initialize the Marchenko-Pastur distribution.
Parameters
----------
ratio : float
The ratio between the number of variables and the dimension of the sample/population.
.. note::
Let :math:`X \in \mathbb{R}^{n \times p}` be a real matrix where samples are **row** vectors, then the ``ratio`` parameter is :math:`\frac{p}{n}`.
var : float
The variance of the distribution.
Warns
-----
UserWarning
If ratio is higher than 1 because of numerical instabilities
Raises
------
ValueError
If ratio is not strictly positive
ValueError
If the lowest eigenvalue is higher than the highest one
"""
if ratio <= 0.0:
raise ValueError(
"Ratio must be strictly positive but got %f <= 0" % ratio,
)
if ratio >= 1.0:
warn(
"Ratios higher than 1 may result in numerical instabilities. We got %f >= 1"
% ratio,
category=UserWarning,
)
if var < 0.0:
raise ValueError(
"Variance must be non-negative but got %f < 0" % var,
)
# Store the parameters
self.ratio: float = float(ratio)
self.var: float = float(var)
# Compute the highest and lowest eigenvalues
self.lplus: float = self.var * (1.0 + np.sqrt(self.ratio)) ** 2
self.lminus: float = self.var * (1.0 - np.sqrt(self.ratio)) ** 2
self.m2: float = 1 / (self.lplus - self.lminus)
if self.lminus > self.lplus:
raise ValueError(
"The smallest eigenvalue must be lower than the largest one, but got lminus = %f > %f = lplus!"
% (self.lminus, self.lplus),
)
@overload
def pdf(self, x: float) -> float: ...
@overload
def pdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
[docs]
def pdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
r"""Compute the PDF of the Marchenko-Pastur distribution.
Notes
-----
The PDF uses this mathematical formula:
.. math::
\mu_{\text{MP}}(x) = \frac{\sqrt{(\lambda_+ - x)(x - \lambda_-)}}{2 \pi \sigma^2 q \, x},
where
.. math::
\lambda_{\pm} = \sigma^2 (1 \pm \sqrt{q})^2,
and
.. math::
q = \frac{p}{n}.
Parameters
----------
x : float or ndarray of floats
The value(s) at which to evaluate the PDF (shape: scalar or array of shape :math:`n`).
Returns
-------
float or ndarray of floats
The value(s) of the PDF at the given value(s) (shape: scalar or array of shape :math:`n`).
"""
if not isinstance(x, float):
return np.vectorize(self.pdf, otypes=[np.float64])(x)
if (x <= self.lminus) or (x >= self.lplus):
return 0.0
num: float | Float[np.ndarray, n_samples] = np.sqrt(
(self.lplus - x) * (x - self.lminus),
)
den: float | Float[np.ndarray, n_samples] = (
2.0 * np.pi * self.var * self.ratio * x
)
return num / den if den != 0.0 else 0.0
@overload
def cdf(self, x: float) -> float: ...
@overload
def cdf(
self,
x: Float[np.ndarray, n_samples],
) -> Float[np.ndarray, n_samples]: ...
[docs]
def cdf(
self,
x: float | Float[np.ndarray, n_samples],
) -> float | Float[np.ndarray, n_samples]:
"""Compute the CDF of the Marchenko-Pastur distribution.
Parameters
----------
x : float | np.ndarray
The value(s) at which to evaluate the CDF.
Returns
-------
float | np.ndarray
The value(s) of the CDF at the given value(s).
"""
if not isinstance(x, float):
return np.vectorize(self.cdf, otypes=[np.float64])(x)
if x <= self.lminus:
return 0.0
if x >= self.lplus:
return 1.0
return quad(lambda y: self.pdf(y), self.lminus, x)[0]