from __future__ import annotations
from typing import Generic, overload
import sys
if sys.version_info < (3, 11):
from typing_extensions import Self
else:
from typing import Self
from abc import ABC, abstractmethod
import bisect
import math
import numpy as np
from .arraytypes import MeterArray, TruePeakArray
from .types import (
NumChannelsT, Float1dArray, Float2dArray, Any1dArray,
Floating, FloatArray,
)
from .typeutils import ensure_nd_array, build_meter_array, build_true_peak_array
from .filters import TruePeakFilter
__all__ = ('BlockProcessor', 'TruePeakProcessor')
SILENCE_DB: Floating = np.float64(-200.)
EPSILON: Floating = np.float64(1e-20)
NEG_INFINITY: Floating = np.float64(-np.inf)
[docs]
class RunningSum:
"""Helper class to calculate the running sum of a series of values
"""
__slots__ = ('_value', '_count', '_mean')
def __init__(self) -> None:
self._value: Floating = np.float64(0)
self._count: int = 0
self._mean: Floating|None = None
@property
def value(self) -> Floating:
"""The current running sum
"""
return self._value
@value.setter
def value(self, value: Floating) -> None:
if value == self._value:
return
self._value = value
self._mean = None
@property
def count(self) -> int:
"""The number of values in the running sum
"""
return self._count
@count.setter
def count(self, count: int) -> None:
if count == self._count:
return
self._count = count
self._mean = None
@property
def mean(self) -> Floating:
"""The mean of the running sum
"""
m = self._mean
if m is None:
m = self._mean = self._value / self._count
return m
[docs]
def add(self, value: Floating) -> None:
"""Add a data point to the running sum
"""
self.value += value
self.count += 1
[docs]
def clear(self) -> None:
"""Reset :attr:`value` and :attr:`count` to zero
"""
self.value = np.float64(0)
self.count = 0
def __iadd__(self, value: Floating) -> Self:
self.add(value)
return self
def __eq__(self, other: object) -> bool:
if not isinstance(other, (np.floating, float, int)):
return NotImplemented
return self.value == other
def __gt__(self, other: Floating) -> bool|np.bool:
return self.value > other
def __ge__(self, other: Floating) -> bool|np.bool:
return self.value >= other
def __lt__(self, other: Floating) -> bool|np.bool:
return self.value < other
def __le__(self, other: Floating) -> bool|np.bool:
return self.value <= other
def __repr__(self) -> str:
return f'<{self.__class__.__name__}: {self}>'
def __str__(self) -> str:
return f'value={float(self.value)}, count={self.count}'
@overload
def lk_log10(x: FloatArray, offset: float = -0.691, base: int = 10, allow_negative_inf: bool = ...) -> FloatArray: ...
@overload
def lk_log10(x: np.floating, offset: float = -0.691, base: int = 10, allow_negative_inf: bool = ...) -> np.floating: ...
def lk_log10(
x: FloatArray|np.floating,
offset: float = -0.691,
base: int = 10,
allow_negative_inf: bool = False,
) -> FloatArray|np.floating:
if allow_negative_inf:
with np.errstate(divide='ignore'):
r = offset + base * np.log10(x)
return r
if isinstance(x, np.ndarray):
x[np.less_equal(x, 0)] = EPSILON
return offset + base * np.log10(x)
if x <= 0:
x = EPSILON
return np.float64(offset + base * math.log10(x))
@overload
def from_lk_log10(x: FloatArray, offset: float = 0.691) -> FloatArray: ...
@overload
def from_lk_log10(x: np.floating, offset: float = 0.691) -> np.floating: ...
def from_lk_log10(
x: FloatArray|np.floating,
offset: float = 0.691
) -> FloatArray|np.floating:
return 10 ** ((x + offset) / 10)
[docs]
def square_and_sum(a: Float2dArray) -> Float1dArray:
"""Compute the per-row sum-of-squares of a 2-D array
Equivalent to ``np.square(a).sum(axis=1)`` but avoids allocating the
intermediate ``(rows, cols)`` squared array, which can be large when
*a* has many columns.
"""
return np.einsum('ij,ij->i', a, a)
def _sorted_quantile(sorted_vals: list[float], lo: int, n: int, q: float) -> float:
"""Compute a quantile from a pre-sorted list slice using linear interpolation.
Matches ``numpy.quantile(arr, q, method='linear')`` exactly.
Parameters:
sorted_vals: the full sorted list
lo: start index of the relevant slice within *sorted_vals*
n: number of elements in the slice
q: quantile in [0, 1]
"""
idx = q * (n - 1)
i_lo = int(idx)
i_hi = min(i_lo + 1, n - 1)
frac = idx - i_lo
v_lo = sorted_vals[lo + i_lo]
v_hi = sorted_vals[lo + i_hi]
return v_lo + frac * (v_hi - v_lo)
[docs]
class BaseProcessor(ABC, Generic[NumChannelsT]):
"""Abstract base class for audio measurement processors
"""
num_channels: NumChannelsT
"""Number of audio channels"""
sample_rate: int
"""The sample rate of the audio data"""
def __init__(self, num_channels: NumChannelsT, sample_rate: int = 48000) -> None:
self.num_channels = num_channels
self.sample_rate = sample_rate
[docs]
@abstractmethod
def __call__(self, samples: Float2dArray) -> None:
"""Process one :term:`gating block`
Input data must be of shape ``(num_channels, gate_size)``
"""
raise NotImplementedError
[docs]
@abstractmethod
def reset(self) -> None:
"""Reset all measurement data
"""
raise NotImplementedError
[docs]
class BlockProcessor(BaseProcessor[NumChannelsT]):
"""Process audio samples and store the resulting loudness data
Arguments:
num_channels: Number of audio channels
gate_size: Length of one :term:`gating block` in samples
sample_rate: Sample rate of the audio data (default: 48000)
momentary_enabled: Enable :term:`Momentary Loudness` processing
(default: ``True``)
short_term_enabled: Enable :term:`Short-Term Loudness` processing
(default: ``True``)
lra_enabled: Enable :term:`Loudness Range` processing
(default: ``True``)
.. important::
If *short_term_enabled* is ``False``, *lra_enabled* must also be
``False``. :term:`Loudness Range` calculation depends on
:term:`Short-Term Loudness` values.
Raises:
ValueError: If *short_term_enabled* is ``False`` and *lra_enabled*
is ``True``
"""
gate_size: int
"""The length of one :term:`gating block` in samples"""
integrated_lkfs: Floating
"""The current :term:`Integrated Loudness`"""
lra: float
"""The current :term:`Loudness Range`
If :attr:`lra_enabled` is ``False``, this will always be ``0``
"""
MAX_BLOCKS = 144000 # <- 14400 seconds (4 hours) / .1 (100 milliseconds)
_channel_weights = np.array([1, 1, 1, 1.41, 1.41])
_block_data: MeterArray
_Zij: Float2dArray
_block_weighted_sums: Float1dArray
_quarter_block_weighted_sums: Float1dArray
_block_loudness: Float1dArray
_blocks_above_abs_thresh: Any1dArray[np.dtype[np.bool_]]
_blocks_above_rel_thresh: Any1dArray[np.dtype[np.bool_]]
def __init__(
self,
num_channels: NumChannelsT,
gate_size: int,
sample_rate: int = 48000,
momentary_enabled: bool = True,
short_term_enabled: bool = True,
lra_enabled: bool = True,
) -> None:
super().__init__(num_channels=num_channels, sample_rate=sample_rate)
self.gate_size = gate_size
self.pad_size = gate_size // 4
if lra_enabled and not short_term_enabled:
raise ValueError("LRA calculation requires short-term loudness to be enabled")
self._momentary_enabled = momentary_enabled
self._short_term_enabled = short_term_enabled
self._lra_enabled = lra_enabled
self.weights = self._channel_weights[:self.num_channels]
self._block_data = build_meter_array(self.MAX_BLOCKS)
self._tg: float = 1.0 / gate_size
self._tp: float = 1.0 / (gate_size // 4)
self._Zij = np.zeros(
(self.num_channels, self.MAX_BLOCKS),
dtype=np.float64,
)
self._block_weighted_sums = np.zeros(self.MAX_BLOCKS, dtype=np.float64)
self._quarter_block_weighted_sums = np.zeros(self.MAX_BLOCKS, dtype=np.float64)
self._block_loudness = np.zeros(self.MAX_BLOCKS, dtype=np.float64)
self._t = self._block_data['t']
self._t[:] = np.arange(self.MAX_BLOCKS) / self.sample_rate * (self.gate_size / 4)
self._blocks_above_abs_thresh = np.zeros(
self.MAX_BLOCKS, dtype=bool
)
self._blocks_above_rel_thresh = np.zeros(
self.MAX_BLOCKS, dtype=bool
)
self._above_abs_running_sum = RunningSum()
self._above_rel_running_sum = RunningSum()
self._lra_abs_power_running_sum = RunningSum()
self._rel_threshold: Floating = np.float64(SILENCE_DB)
self._momentary_sum = RunningSum()
self._short_term_sum = RunningSum()
self._momentary_lkfs: Float1dArray = self._block_data['m']
self._short_term_lkfs: Float1dArray = self._block_data['s']
self.integrated_lkfs = SILENCE_DB
self.lra = 0
self.num_blocks = 0
self.block_index = 0
# Incremental state for LRA calculation (avoids full O(n log n) rescan per block)
self._lra_sorted_abs_gated: list[float] = []
@property
def momentary_enabled(self) -> bool:
"""Whether :term:`Momentary Loudness` processing is enabled (read-only)
"""
return self._momentary_enabled
@property
def short_term_enabled(self) -> bool:
"""Whether :term:`Short-Term Loudness` processing is enabled (read-only)
"""
return self._short_term_enabled
@property
def lra_enabled(self) -> bool:
"""Whether :term:`Loudness Range` processing is enabled (read-only)
"""
return self._lra_enabled
@property
def block_data(self) -> MeterArray:
"""A structured array of measurement values with
dtype :obj:`~.arraytypes.MeterDtype`
"""
return self._block_data[:self.block_index]
@property
def momentary_lkfs(self) -> Float1dArray:
""":term:`Momentary Loudness` for each 100ms block, averaged over 400ms
(not gated)
If :attr:`momentary_enabled` is ``False``, this will be an array of zeros
"""
return self.block_data['m']
@property
def short_term_lkfs(self) -> Float1dArray:
""":term:`Short-Term Loudness` for each 100ms block, averaged over 3 seconds
(not gated)
If :attr:`short_term_enabled` is ``False``, this will be an array of zeros
"""
return self.block_data['s']
@property
def t(self) -> Float1dArray:
"""The measurement time for each element in :attr:`short_term_lkfs`
and :attr:`momentary_lkfs`
"""
return self.block_data['t']
@property
def Zij(self) -> Float2dArray:
"""Mean-squared values per channel in each 400ms block
(not weighted)
"""
zij = self._Zij[:,:self.block_index]
return ensure_nd_array(zij, 2)
[docs]
def reset(self) -> None:
"""Reset all measurement data
"""
n = self.block_index
self.block_data['m'][:] = 0
self.block_data['s'][:] = 0
self._Zij[:, :n] = 0
self._block_weighted_sums[:n] = 0
self._quarter_block_weighted_sums[:n] = 0
self._block_loudness[:n] = 0
self._rel_threshold = SILENCE_DB
self._blocks_above_abs_thresh[:n] = False
self._blocks_above_rel_thresh[:n] = False
self._above_rel_running_sum.clear()
self._above_abs_running_sum.clear()
self._lra_abs_power_running_sum.clear()
self.integrated_lkfs = SILENCE_DB
self.lra = 0
self.block_index = 0
self.num_blocks = 0
self._lra_sorted_abs_gated = []
self._momentary_sum.clear()
self._short_term_sum.clear()
[docs]
def __call__(self, samples: Float2dArray) -> None:
self.process_block(samples)
def __len__(self) -> int:
return self.num_blocks
def _calc_gating(self) -> None:
block_lk = self._block_loudness[:self.block_index+1]
blocks_above_rel_thresh = self._blocks_above_rel_thresh[:self.block_index+1]
blocks_above_abs_thresh = self._blocks_above_abs_thresh[:self.block_index+1]
block_wsums = self._block_weighted_sums[:self.block_index+1]
cur_block_lk = block_lk[-1]
cur_block_wsum = block_wsums[-1]
above_abs = cur_block_lk >= -70
rel_threshold_changed = False
if above_abs:
self._above_abs_running_sum += cur_block_wsum
blocks_above_abs_thresh[-1] = above_abs
if self._above_abs_running_sum == 0:
rel_threshold = SILENCE_DB
else:
rel_threshold = lk_log10(self._above_abs_running_sum.mean) - 10
if rel_threshold != self._rel_threshold:
rel_threshold_changed = True
rs = self._above_rel_running_sum
if rel_threshold_changed:
blocks_above_rel_thresh[:] = block_lk >= rel_threshold
x = block_wsums[np.logical_and(
blocks_above_rel_thresh,
blocks_above_abs_thresh
)]
rs.value = x.sum()
rs.count = x.size
else:
# If the relative threshold hasn't changed, we can avoid doing
# a full array comparison and just add the current block to the running sum.
blocks_above_rel_thresh[-1] = cur_block_lk >= rel_threshold
if cur_block_lk >= rel_threshold and above_abs:
rs += cur_block_wsum
self._rel_threshold = rel_threshold
if not rs.count:
self.integrated_lkfs = SILENCE_DB
else:
self.integrated_lkfs = lk_log10(rs.mean)
def _calc_momentary(self):
block_index = self.block_index
new_val = self._quarter_block_weighted_sums[block_index]
self._momentary_sum.value += new_val
if block_index >= 3:
self._momentary_sum.value -= self._quarter_block_weighted_sums[block_index - 3]
self._momentary_sum.count = 3
else:
self._momentary_sum.count = block_index + 1
r = lk_log10(self._momentary_sum.mean)
if r < SILENCE_DB:
r = SILENCE_DB
self._momentary_lkfs[block_index] = r
def _calc_short_term(self):
block_index = self.block_index
num_blocks = 30 # 3 second window (100ms per block_index)
new_val = self._quarter_block_weighted_sums[block_index]
self._short_term_sum.value += new_val
if block_index >= num_blocks:
self._short_term_sum.value -= self._quarter_block_weighted_sums[block_index - num_blocks]
self._short_term_sum.count = num_blocks
else:
self._short_term_sum.count = block_index + 1
st = lk_log10(self._short_term_sum.mean)
self._short_term_lkfs[block_index] = st
def _calc_lra(self):
block_index = self.block_index
# Update incremental state with the current block's short-term loudness.
# This must happen every block (not just when block_index >= 4) so that
# the sorted list is fully populated when we first compute LRA at block 4.
st_lkfs = float(self._short_term_lkfs[block_index])
if st_lkfs >= -70.0:
bisect.insort(self._lra_sorted_abs_gated, st_lkfs)
# from_lk_log10 inlined as Python float arithmetic to avoid numpy overhead
self._lra_abs_power_running_sum += np.float64(10.0 ** ((st_lkfs + 0.691) / 10.0))
if block_index < 4:
return
if not self._lra_abs_power_running_sum.count:
return
mean_abs_power = self._lra_abs_power_running_sum.mean
st_integrated = lk_log10(np.float64(mean_abs_power))
rel_threshold = float(st_integrated) - 20.0
sorted_vals = self._lra_sorted_abs_gated
n_all = len(sorted_vals)
# Find the count of values >= rel_threshold using binary search
lo_rel = bisect.bisect_left(sorted_vals, rel_threshold)
n_rel = n_all - lo_rel
if not n_rel:
return
self.lra = (
_sorted_quantile(sorted_vals, lo_rel, n_rel, 0.95)
- _sorted_quantile(sorted_vals, lo_rel, n_rel, 0.10)
)
[docs]
def process_block(self, samples: Float2dArray):
"""Process one :term:`gating block`
Input data must be of shape ``(num_channels, gate_size)``
"""
# TODO: Check and raise exception if exceeding MAX_BLOCKS
assert samples.shape == (self.num_channels, self.gate_size)
if self.momentary_enabled or self.short_term_enabled:
# Compute sum-of-squares for the quarter block (last 100ms) first,
# then sum the rest and accumulate. This avoids allocating and reading
# back the full (num_channels, gate_size) squared intermediate twice.
quarter_sq_sum: Float1dArray = square_and_sum(samples[:, -self.pad_size:])
sq_sum: Float1dArray = (
square_and_sum(samples[:, :-self.pad_size]) + quarter_sq_sum
)
self._process_quarter_block(quarter_sq_sum)
else:
sq_sum = square_and_sum(samples)
_Zij = self._tg * sq_sum
assert _Zij.shape == (self.num_channels,)
weighted_sum = np.dot(_Zij, self.weights)
self._block_weighted_sums[self.block_index] = weighted_sum
block_loudness = lk_log10(weighted_sum)
self._block_loudness[self.block_index] = block_loudness
self._Zij[:,self.block_index] = _Zij
self._calc_gating()
if self.momentary_enabled:
self._calc_momentary()
if self.short_term_enabled:
self._calc_short_term()
if self.lra_enabled:
self._calc_lra()
self.block_index += 1
self.num_blocks += 1
def _process_quarter_block(self, quarter_sq_sum: Float1dArray):
# Calculate the weighted squared-sum of the last 100ms segment within
# this 400ms block (for use in momentary/short-term calculation).
# quarter_sq_sum is the per-channel sum of squares for the final pad_size
# samples, already computed in process_block.
weighted_sum = self._tp * np.dot(quarter_sq_sum, self.weights)
self._quarter_block_weighted_sums[self.block_index] = weighted_sum
[docs]
class TruePeakProcessor(BaseProcessor[NumChannelsT]):
"""Process audio samples to extract their :term:`True Peak` values
"""
max_peak: Floating
"""Maximum :term:`True Peak` value detected"""
gate_size: int
"""The length in samples to process per call"""
MAX_TIME_SECONDS = 14400.0 # 4 hours
def __init__(self, num_channels: NumChannelsT, gate_size: int, sample_rate: int = 48000) -> None:
super().__init__(num_channels=num_channels, sample_rate=sample_rate)
up_sample = 4 if sample_rate < 88100 else 2
self.resample_filt = TruePeakFilter(
num_channels=num_channels, upsample_factor=up_sample,
)
self.max_peak = NEG_INFINITY
self.gate_size = gate_size
gate_t = gate_size / sample_rate
max_blocks = int(self.MAX_TIME_SECONDS / gate_t)
self._tp_array: TruePeakArray[NumChannelsT] = build_true_peak_array(
num_channels=self.num_channels,
size=max_blocks,
)
self._t = self._tp_array['t']
self._t[:] = np.arange(max_blocks) * gate_t
self._all_tp_values = self._tp_array['tp']
self._all_tp_values[...] = NEG_INFINITY
self._block_index = 0
@property
def tp_array(self) -> TruePeakArray[NumChannelsT]:
"""A structured array of measurement values with
dtype :obj:`~.arraytypes.TruePeakDtype`
"""
return self._tp_array[:self._block_index]
@property
def t(self) -> Float1dArray:
"""The measurement times for all processed blocks in :attr:`tp_array`
"""
return self.tp_array['t']
@property
def all_tp_values(self) -> np.ndarray[tuple[int, NumChannelsT], np.dtype[np.float64]]:
"""All :term:`True Peak` values per channel for each processed block
"""
return self.tp_array['tp']
@property
def current_peaks(self) -> np.ndarray[tuple[NumChannelsT], np.dtype[np.float64]]:
""":term:`True Peak` values per channel from the last processing period"""
if self._block_index == 0:
return self._all_tp_values[0]
return self.all_tp_values[-1]
[docs]
def __call__(self, samples: Float2dArray) -> None:
self.process(samples)
[docs]
def reset(self) -> None:
self.max_peak = NEG_INFINITY
self._all_tp_values[0, :] = NEG_INFINITY
self._block_index = 0
def process(self, samples: Float2dArray):
# TODO: Check and raise exception if exceeding MAX_TIME_SECONDS
tp_vals = self.resample_filt(samples)
tp_amp_max = np.abs(tp_vals).max(axis=1)
cur_peaks = lk_log10(tp_amp_max, offset=0, base=20, allow_negative_inf=True)
max_peak = cur_peaks.max()
if max_peak > self.max_peak:
self.max_peak = max_peak
self._all_tp_values[self._block_index, :] = cur_peaks
self._block_index += 1