Source code for lupy.processing

from __future__ import annotations

from typing import Generic, overload
import sys
if sys.version_info < (3, 11):
    from typing_extensions import Self
else:
    from typing import Self
from abc import ABC, abstractmethod
import bisect
import math

import numpy as np

from .arraytypes import MeterArray, TruePeakArray
from .types import (
    NumChannelsT, Float1dArray, Float2dArray, Any1dArray,
    Floating, FloatArray,
)
from .typeutils import ensure_nd_array, build_meter_array, build_true_peak_array
from .filters import TruePeakFilter

__all__ = ('BlockProcessor', 'TruePeakProcessor')

SILENCE_DB: Floating = np.float64(-200.)
EPSILON: Floating = np.float64(1e-20)
NEG_INFINITY: Floating = np.float64(-np.inf)


[docs] class RunningSum: """Helper class to calculate the running sum of a series of values """ __slots__ = ('_value', '_count', '_mean') def __init__(self) -> None: self._value: Floating = np.float64(0) self._count: int = 0 self._mean: Floating|None = None @property def value(self) -> Floating: """The current running sum """ return self._value @value.setter def value(self, value: Floating) -> None: if value == self._value: return self._value = value self._mean = None @property def count(self) -> int: """The number of values in the running sum """ return self._count @count.setter def count(self, count: int) -> None: if count == self._count: return self._count = count self._mean = None @property def mean(self) -> Floating: """The mean of the running sum """ m = self._mean if m is None: m = self._mean = self._value / self._count return m
[docs] def add(self, value: Floating) -> None: """Add a data point to the running sum """ self.value += value self.count += 1
[docs] def clear(self) -> None: """Reset :attr:`value` and :attr:`count` to zero """ self.value = np.float64(0) self.count = 0
def __iadd__(self, value: Floating) -> Self: self.add(value) return self def __eq__(self, other: object) -> bool: if not isinstance(other, (np.floating, float, int)): return NotImplemented return self.value == other def __gt__(self, other: Floating) -> bool|np.bool: return self.value > other def __ge__(self, other: Floating) -> bool|np.bool: return self.value >= other def __lt__(self, other: Floating) -> bool|np.bool: return self.value < other def __le__(self, other: Floating) -> bool|np.bool: return self.value <= other def __repr__(self) -> str: return f'<{self.__class__.__name__}: {self}>' def __str__(self) -> str: return f'value={float(self.value)}, count={self.count}'
@overload def lk_log10(x: FloatArray, offset: float = -0.691, base: int = 10, allow_negative_inf: bool = ...) -> FloatArray: ... @overload def lk_log10(x: np.floating, offset: float = -0.691, base: int = 10, allow_negative_inf: bool = ...) -> np.floating: ... def lk_log10( x: FloatArray|np.floating, offset: float = -0.691, base: int = 10, allow_negative_inf: bool = False, ) -> FloatArray|np.floating: if allow_negative_inf: with np.errstate(divide='ignore'): r = offset + base * np.log10(x) return r if isinstance(x, np.ndarray): x[np.less_equal(x, 0)] = EPSILON return offset + base * np.log10(x) if x <= 0: x = EPSILON return np.float64(offset + base * math.log10(x)) @overload def from_lk_log10(x: FloatArray, offset: float = 0.691) -> FloatArray: ... @overload def from_lk_log10(x: np.floating, offset: float = 0.691) -> np.floating: ... def from_lk_log10( x: FloatArray|np.floating, offset: float = 0.691 ) -> FloatArray|np.floating: return 10 ** ((x + offset) / 10)
[docs] def square_and_sum(a: Float2dArray) -> Float1dArray: """Compute the per-row sum-of-squares of a 2-D array Equivalent to ``np.square(a).sum(axis=1)`` but avoids allocating the intermediate ``(rows, cols)`` squared array, which can be large when *a* has many columns. """ return np.einsum('ij,ij->i', a, a)
def _sorted_quantile(sorted_vals: list[float], lo: int, n: int, q: float) -> float: """Compute a quantile from a pre-sorted list slice using linear interpolation. Matches ``numpy.quantile(arr, q, method='linear')`` exactly. Parameters: sorted_vals: the full sorted list lo: start index of the relevant slice within *sorted_vals* n: number of elements in the slice q: quantile in [0, 1] """ idx = q * (n - 1) i_lo = int(idx) i_hi = min(i_lo + 1, n - 1) frac = idx - i_lo v_lo = sorted_vals[lo + i_lo] v_hi = sorted_vals[lo + i_hi] return v_lo + frac * (v_hi - v_lo)
[docs] class BaseProcessor(ABC, Generic[NumChannelsT]): """Abstract base class for audio measurement processors """ num_channels: NumChannelsT """Number of audio channels""" sample_rate: int """The sample rate of the audio data""" def __init__(self, num_channels: NumChannelsT, sample_rate: int = 48000) -> None: self.num_channels = num_channels self.sample_rate = sample_rate
[docs] @abstractmethod def __call__(self, samples: Float2dArray) -> None: """Process one :term:`gating block` Input data must be of shape ``(num_channels, gate_size)`` """ raise NotImplementedError
[docs] @abstractmethod def reset(self) -> None: """Reset all measurement data """ raise NotImplementedError
[docs] class BlockProcessor(BaseProcessor[NumChannelsT]): """Process audio samples and store the resulting loudness data Arguments: num_channels: Number of audio channels gate_size: Length of one :term:`gating block` in samples sample_rate: Sample rate of the audio data (default: 48000) momentary_enabled: Enable :term:`Momentary Loudness` processing (default: ``True``) short_term_enabled: Enable :term:`Short-Term Loudness` processing (default: ``True``) lra_enabled: Enable :term:`Loudness Range` processing (default: ``True``) .. important:: If *short_term_enabled* is ``False``, *lra_enabled* must also be ``False``. :term:`Loudness Range` calculation depends on :term:`Short-Term Loudness` values. Raises: ValueError: If *short_term_enabled* is ``False`` and *lra_enabled* is ``True`` """ gate_size: int """The length of one :term:`gating block` in samples""" integrated_lkfs: Floating """The current :term:`Integrated Loudness`""" lra: float """The current :term:`Loudness Range` If :attr:`lra_enabled` is ``False``, this will always be ``0`` """ MAX_BLOCKS = 144000 # <- 14400 seconds (4 hours) / .1 (100 milliseconds) _channel_weights = np.array([1, 1, 1, 1.41, 1.41]) _block_data: MeterArray _Zij: Float2dArray _block_weighted_sums: Float1dArray _quarter_block_weighted_sums: Float1dArray _block_loudness: Float1dArray _blocks_above_abs_thresh: Any1dArray[np.dtype[np.bool_]] _blocks_above_rel_thresh: Any1dArray[np.dtype[np.bool_]] def __init__( self, num_channels: NumChannelsT, gate_size: int, sample_rate: int = 48000, momentary_enabled: bool = True, short_term_enabled: bool = True, lra_enabled: bool = True, ) -> None: super().__init__(num_channels=num_channels, sample_rate=sample_rate) self.gate_size = gate_size self.pad_size = gate_size // 4 if lra_enabled and not short_term_enabled: raise ValueError("LRA calculation requires short-term loudness to be enabled") self._momentary_enabled = momentary_enabled self._short_term_enabled = short_term_enabled self._lra_enabled = lra_enabled self.weights = self._channel_weights[:self.num_channels] self._block_data = build_meter_array(self.MAX_BLOCKS) self._tg: float = 1.0 / gate_size self._tp: float = 1.0 / (gate_size // 4) self._Zij = np.zeros( (self.num_channels, self.MAX_BLOCKS), dtype=np.float64, ) self._block_weighted_sums = np.zeros(self.MAX_BLOCKS, dtype=np.float64) self._quarter_block_weighted_sums = np.zeros(self.MAX_BLOCKS, dtype=np.float64) self._block_loudness = np.zeros(self.MAX_BLOCKS, dtype=np.float64) self._t = self._block_data['t'] self._t[:] = np.arange(self.MAX_BLOCKS) / self.sample_rate * (self.gate_size / 4) self._blocks_above_abs_thresh = np.zeros( self.MAX_BLOCKS, dtype=bool ) self._blocks_above_rel_thresh = np.zeros( self.MAX_BLOCKS, dtype=bool ) self._above_abs_running_sum = RunningSum() self._above_rel_running_sum = RunningSum() self._lra_abs_power_running_sum = RunningSum() self._rel_threshold: Floating = np.float64(SILENCE_DB) self._momentary_sum = RunningSum() self._short_term_sum = RunningSum() self._momentary_lkfs: Float1dArray = self._block_data['m'] self._short_term_lkfs: Float1dArray = self._block_data['s'] self.integrated_lkfs = SILENCE_DB self.lra = 0 self.num_blocks = 0 self.block_index = 0 # Incremental state for LRA calculation (avoids full O(n log n) rescan per block) self._lra_sorted_abs_gated: list[float] = [] @property def momentary_enabled(self) -> bool: """Whether :term:`Momentary Loudness` processing is enabled (read-only) """ return self._momentary_enabled @property def short_term_enabled(self) -> bool: """Whether :term:`Short-Term Loudness` processing is enabled (read-only) """ return self._short_term_enabled @property def lra_enabled(self) -> bool: """Whether :term:`Loudness Range` processing is enabled (read-only) """ return self._lra_enabled @property def block_data(self) -> MeterArray: """A structured array of measurement values with dtype :obj:`~.arraytypes.MeterDtype` """ return self._block_data[:self.block_index] @property def momentary_lkfs(self) -> Float1dArray: """:term:`Momentary Loudness` for each 100ms block, averaged over 400ms (not gated) If :attr:`momentary_enabled` is ``False``, this will be an array of zeros """ return self.block_data['m'] @property def short_term_lkfs(self) -> Float1dArray: """:term:`Short-Term Loudness` for each 100ms block, averaged over 3 seconds (not gated) If :attr:`short_term_enabled` is ``False``, this will be an array of zeros """ return self.block_data['s'] @property def t(self) -> Float1dArray: """The measurement time for each element in :attr:`short_term_lkfs` and :attr:`momentary_lkfs` """ return self.block_data['t'] @property def Zij(self) -> Float2dArray: """Mean-squared values per channel in each 400ms block (not weighted) """ zij = self._Zij[:,:self.block_index] return ensure_nd_array(zij, 2)
[docs] def reset(self) -> None: """Reset all measurement data """ n = self.block_index self.block_data['m'][:] = 0 self.block_data['s'][:] = 0 self._Zij[:, :n] = 0 self._block_weighted_sums[:n] = 0 self._quarter_block_weighted_sums[:n] = 0 self._block_loudness[:n] = 0 self._rel_threshold = SILENCE_DB self._blocks_above_abs_thresh[:n] = False self._blocks_above_rel_thresh[:n] = False self._above_rel_running_sum.clear() self._above_abs_running_sum.clear() self._lra_abs_power_running_sum.clear() self.integrated_lkfs = SILENCE_DB self.lra = 0 self.block_index = 0 self.num_blocks = 0 self._lra_sorted_abs_gated = [] self._momentary_sum.clear() self._short_term_sum.clear()
[docs] def __call__(self, samples: Float2dArray) -> None: self.process_block(samples)
def __len__(self) -> int: return self.num_blocks def _calc_gating(self) -> None: block_lk = self._block_loudness[:self.block_index+1] blocks_above_rel_thresh = self._blocks_above_rel_thresh[:self.block_index+1] blocks_above_abs_thresh = self._blocks_above_abs_thresh[:self.block_index+1] block_wsums = self._block_weighted_sums[:self.block_index+1] cur_block_lk = block_lk[-1] cur_block_wsum = block_wsums[-1] above_abs = cur_block_lk >= -70 rel_threshold_changed = False if above_abs: self._above_abs_running_sum += cur_block_wsum blocks_above_abs_thresh[-1] = above_abs if self._above_abs_running_sum == 0: rel_threshold = SILENCE_DB else: rel_threshold = lk_log10(self._above_abs_running_sum.mean) - 10 if rel_threshold != self._rel_threshold: rel_threshold_changed = True rs = self._above_rel_running_sum if rel_threshold_changed: blocks_above_rel_thresh[:] = block_lk >= rel_threshold x = block_wsums[np.logical_and( blocks_above_rel_thresh, blocks_above_abs_thresh )] rs.value = x.sum() rs.count = x.size else: # If the relative threshold hasn't changed, we can avoid doing # a full array comparison and just add the current block to the running sum. blocks_above_rel_thresh[-1] = cur_block_lk >= rel_threshold if cur_block_lk >= rel_threshold and above_abs: rs += cur_block_wsum self._rel_threshold = rel_threshold if not rs.count: self.integrated_lkfs = SILENCE_DB else: self.integrated_lkfs = lk_log10(rs.mean) def _calc_momentary(self): block_index = self.block_index new_val = self._quarter_block_weighted_sums[block_index] self._momentary_sum.value += new_val if block_index >= 3: self._momentary_sum.value -= self._quarter_block_weighted_sums[block_index - 3] self._momentary_sum.count = 3 else: self._momentary_sum.count = block_index + 1 r = lk_log10(self._momentary_sum.mean) if r < SILENCE_DB: r = SILENCE_DB self._momentary_lkfs[block_index] = r def _calc_short_term(self): block_index = self.block_index num_blocks = 30 # 3 second window (100ms per block_index) new_val = self._quarter_block_weighted_sums[block_index] self._short_term_sum.value += new_val if block_index >= num_blocks: self._short_term_sum.value -= self._quarter_block_weighted_sums[block_index - num_blocks] self._short_term_sum.count = num_blocks else: self._short_term_sum.count = block_index + 1 st = lk_log10(self._short_term_sum.mean) self._short_term_lkfs[block_index] = st def _calc_lra(self): block_index = self.block_index # Update incremental state with the current block's short-term loudness. # This must happen every block (not just when block_index >= 4) so that # the sorted list is fully populated when we first compute LRA at block 4. st_lkfs = float(self._short_term_lkfs[block_index]) if st_lkfs >= -70.0: bisect.insort(self._lra_sorted_abs_gated, st_lkfs) # from_lk_log10 inlined as Python float arithmetic to avoid numpy overhead self._lra_abs_power_running_sum += np.float64(10.0 ** ((st_lkfs + 0.691) / 10.0)) if block_index < 4: return if not self._lra_abs_power_running_sum.count: return mean_abs_power = self._lra_abs_power_running_sum.mean st_integrated = lk_log10(np.float64(mean_abs_power)) rel_threshold = float(st_integrated) - 20.0 sorted_vals = self._lra_sorted_abs_gated n_all = len(sorted_vals) # Find the count of values >= rel_threshold using binary search lo_rel = bisect.bisect_left(sorted_vals, rel_threshold) n_rel = n_all - lo_rel if not n_rel: return self.lra = ( _sorted_quantile(sorted_vals, lo_rel, n_rel, 0.95) - _sorted_quantile(sorted_vals, lo_rel, n_rel, 0.10) )
[docs] def process_block(self, samples: Float2dArray): """Process one :term:`gating block` Input data must be of shape ``(num_channels, gate_size)`` """ # TODO: Check and raise exception if exceeding MAX_BLOCKS assert samples.shape == (self.num_channels, self.gate_size) if self.momentary_enabled or self.short_term_enabled: # Compute sum-of-squares for the quarter block (last 100ms) first, # then sum the rest and accumulate. This avoids allocating and reading # back the full (num_channels, gate_size) squared intermediate twice. quarter_sq_sum: Float1dArray = square_and_sum(samples[:, -self.pad_size:]) sq_sum: Float1dArray = ( square_and_sum(samples[:, :-self.pad_size]) + quarter_sq_sum ) self._process_quarter_block(quarter_sq_sum) else: sq_sum = square_and_sum(samples) _Zij = self._tg * sq_sum assert _Zij.shape == (self.num_channels,) weighted_sum = np.dot(_Zij, self.weights) self._block_weighted_sums[self.block_index] = weighted_sum block_loudness = lk_log10(weighted_sum) self._block_loudness[self.block_index] = block_loudness self._Zij[:,self.block_index] = _Zij self._calc_gating() if self.momentary_enabled: self._calc_momentary() if self.short_term_enabled: self._calc_short_term() if self.lra_enabled: self._calc_lra() self.block_index += 1 self.num_blocks += 1
def _process_quarter_block(self, quarter_sq_sum: Float1dArray): # Calculate the weighted squared-sum of the last 100ms segment within # this 400ms block (for use in momentary/short-term calculation). # quarter_sq_sum is the per-channel sum of squares for the final pad_size # samples, already computed in process_block. weighted_sum = self._tp * np.dot(quarter_sq_sum, self.weights) self._quarter_block_weighted_sums[self.block_index] = weighted_sum
[docs] class TruePeakProcessor(BaseProcessor[NumChannelsT]): """Process audio samples to extract their :term:`True Peak` values """ max_peak: Floating """Maximum :term:`True Peak` value detected""" gate_size: int """The length in samples to process per call""" MAX_TIME_SECONDS = 14400.0 # 4 hours def __init__(self, num_channels: NumChannelsT, gate_size: int, sample_rate: int = 48000) -> None: super().__init__(num_channels=num_channels, sample_rate=sample_rate) up_sample = 4 if sample_rate < 88100 else 2 self.resample_filt = TruePeakFilter( num_channels=num_channels, upsample_factor=up_sample, ) self.max_peak = NEG_INFINITY self.gate_size = gate_size gate_t = gate_size / sample_rate max_blocks = int(self.MAX_TIME_SECONDS / gate_t) self._tp_array: TruePeakArray[NumChannelsT] = build_true_peak_array( num_channels=self.num_channels, size=max_blocks, ) self._t = self._tp_array['t'] self._t[:] = np.arange(max_blocks) * gate_t self._all_tp_values = self._tp_array['tp'] self._all_tp_values[...] = NEG_INFINITY self._block_index = 0 @property def tp_array(self) -> TruePeakArray[NumChannelsT]: """A structured array of measurement values with dtype :obj:`~.arraytypes.TruePeakDtype` """ return self._tp_array[:self._block_index] @property def t(self) -> Float1dArray: """The measurement times for all processed blocks in :attr:`tp_array` """ return self.tp_array['t'] @property def all_tp_values(self) -> np.ndarray[tuple[int, NumChannelsT], np.dtype[np.float64]]: """All :term:`True Peak` values per channel for each processed block """ return self.tp_array['tp'] @property def current_peaks(self) -> np.ndarray[tuple[NumChannelsT], np.dtype[np.float64]]: """:term:`True Peak` values per channel from the last processing period""" if self._block_index == 0: return self._all_tp_values[0] return self.all_tp_values[-1]
[docs] def __call__(self, samples: Float2dArray) -> None: self.process(samples)
[docs] def reset(self) -> None: self.max_peak = NEG_INFINITY self._all_tp_values[0, :] = NEG_INFINITY self._block_index = 0
def process(self, samples: Float2dArray): # TODO: Check and raise exception if exceeding MAX_TIME_SECONDS tp_vals = self.resample_filt(samples) tp_amp_max = np.abs(tp_vals).max(axis=1) cur_peaks = lk_log10(tp_amp_max, offset=0, base=20, allow_negative_inf=True) max_peak = cur_peaks.max() if max_peak > self.max_peak: self.max_peak = max_peak self._all_tp_values[self._block_index, :] = cur_peaks self._block_index += 1