from __future__ import annotations
from typing import TypeVar, Generic, NamedTuple
import sys
if sys.version_info < (3, 11):
from typing_extensions import Self
else:
from typing import Self
from abc import ABC, abstractmethod
from fractions import Fraction
import math
import threading
import numpy as np
from .types import (
NumChannelsT, Float2dArray, Float3dArray, AnyArray, IndexArray,
FloatArray, Float2dArray32,
)
from .typeutils import ensure_2d_array, is_float64_array
from .filters import FilterGroup, HS_COEFF, HP_COEFF
T = TypeVar('T')
__all__ = (
'Sampler', 'TruePeakSampler',
'ThreadSafeSampler', 'ThreadSafeTruePeakSampler',
)
[docs]
class BufferShape(NamedTuple):
total_samples: int
"""Total number of samples for the buffer"""
block_size: int
"""The input block size"""
num_blocks: int
"""Number of blocks (``total_samples // block_size``)"""
pad_size: int
"""The padding (overlap) between each windowed :term:`gating block`"""
gate_size: int
"""Total length in samples of each :term:`gating block`"""
num_gate_blocks: int
"""Number of overlapping gating blocks that can be stored within
:attr:`total_samples`
"""
gate_step_size: int
"""The step size in samples between each overlapped :term:`gating block`
"""
[docs]
class Slice:
"""Helper class to manage slicing of overlapping array chunks
This can be used to slice overlapping or non-overlapping chunks
from an array, wrapping around the end of the array as needed.
For non-overlapping slices, set :attr:`overlap` to zero.
Arguments:
step: Length of each sliced array chunk
overlap: Number of elements to repeat for each sliced array chunk
max_index: Maximum index value before wrapping to zero
index_: Initial index value (default 0)
.. note::
The naming of :attr:`step` and :attr:`overlap` is somewhat
counter-intuitive. :attr:`step` refers to the length of each
sliced chunk (what would typically be called "window size"), while
:attr:`overlap` refers to the number of elements to repeat between
chunks (what would typically be called "step size").
Examples:
Overlapping Slices:
>>> arr = np.arange(6)
>>> sl = Slice(step=4, overlap=2, max_index=0)
>>> sl.slice(arr, axis=0) # index 0
array([0, 1, 2, 3])
>>> sl.increment(arr, axis=0) # index 1
>>> sl.slice(arr, axis=0)
array([2, 3, 4, 5])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0) # index 2 (wraps around)
array([4, 5, 0, 1])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0) # index 0
array([0, 1, 2, 3])
Non-overlapping Slices:
>>> sl = Slice(step=3, overlap=0, max_index=1)
>>> sl.slice(arr, axis=0) # index 0
array([0, 1, 2])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0) # index 1
array([3, 4, 5])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0) # index 0 (wraps around)
array([0, 1, 2])
"""
step: int
"""Length of each sliced array chunk
(this would be better named "win_size")
"""
overlap: int
"""Number of elements to repeat for each sliced array chunk
(this would be better named "step")
"""
max_index: int
"""Maximum :attr:`index` value before wrapping to zero when :attr:`overlap` is zero.
.. note::
This has no effect when :attr:`overlap` is non-zero, since the slice will
wrap around the end of the array as needed regardless of the index value.
"""
def __init__(
self,
step: int,
max_index: int,
index_: int = 0,
overlap: int = 0
) -> None:
self._index = index_
self.step = step
self.overlap = overlap
self.max_index = max_index
self._start_index: int|None = None
self._end_index: int|None = None
@property
def index(self) -> int:
"""The current index value"""
return self._index
@index.setter
def index(self, value: int):
if value > self.max_index and self.overlap == 0:
value = 0
if value == self._index:
return
self._index = value
self._start_index = None
self._end_index = None
@property
def start_index(self) -> int:
"""The starting index of the current slice"""
ix = self._start_index
if ix is None:
if self.overlap != 0:
ix = self._start_index = self.index * self.overlap
else:
ix = self._start_index = self.index * self.step
if ix < 0:
ix = 0
return ix
@property
def end_index(self) -> int:
"""The ending index of the current slice"""
ix = self._end_index
if ix is None:
ix = self._end_index = self.start_index + self.step
return ix
[docs]
def increment(self, x: AnyArray, axis: int) -> None:
"""Increment the slice to the next position, wrapping around the end
of the array as needed
Arguments:
x: The array being sliced
axis: The axis along which to slice
"""
if self.index == 0:
self.index += 1
return
if self.overlap != 0:
start_ix = self.start_index + self.overlap
else:
start_ix = self.start_index + self.step
if start_ix >= x.shape[axis]:
self.index = 0
else:
self._index += 1
self._start_index = start_ix
self._end_index = None
[docs]
def is_wrapped(self, x: AnyArray, axis: int) -> bool:
"""Whether the current slice wraps around the end of the array
Arguments:
x: The array being sliced
axis: The axis along which to slice
"""
return self.end_index > x.shape[axis]
[docs]
def indices(self, arr_len: int) -> IndexArray:
"""Get an index array for the current slice, wrapping around
the end of the array as needed
Arguments:
arr_len: Length of the array being sliced
"""
a = np.arange(self.step, dtype=np.intp) + self.start_index
a[a>=arr_len] -= arr_len
return a
[docs]
def calc_shape(self, x: AnyArray, axis: int) -> tuple[int, ...]:
"""Calculate the shape of the sliced array along the specified axis
Arguments:
x: The array being sliced
axis: The axis along which to slice
"""
ndim = x.ndim
if axis == ndim - 1:
new_shape = list(x.shape[:-1])
new_shape.append(self.step)
else:
new_shape = list(x.shape)
ax_size = self.step
new_shape[axis] = ax_size
del new_shape[axis + 1]
return tuple(new_shape)
[docs]
def build_slice_array(self, x: AnyArray, axis: int) -> tuple[slice|IndexArray, ...]:
"""Build a tuple of slices/indices for slicing the array along
the specified axis
If the slice wraps around the end of the array, an index array
will be used for that axis. Otherwise, a standard slice will be used.
Arguments:
x: The array being sliced
axis: The axis along which to slice
"""
start_ix: int|None
start_ix, end_ix = self.start_index, self.end_index
if start_ix == 0:
start_ix = None
sl_arr: list[slice|IndexArray] = [
slice(None, None, None) for _ in range(x.ndim)
]
ax_slice: slice|IndexArray
if self.is_wrapped(x, axis):
ax_slice = self.indices(x.shape[axis])
else:
ax_slice = slice(start_ix, end_ix)
sl_arr[axis] = ax_slice
return tuple(sl_arr)
[docs]
def slice(self, x: AnyArray, axis: int) -> AnyArray:
"""Get the current slice of the array along the specified axis
Arguments:
x: The array being sliced
axis: The axis along which to slice
"""
sl_arr = self.build_slice_array(x, axis)
new_shape = self.calc_shape(x, axis)
return np.reshape(x[sl_arr], new_shape)
def __repr__(self) -> str:
return f'<Slice: {self}>'
def __str__(self) -> str:
return str(self.index)
[docs]
def calc_buffer_length(sample_rate: int, block_size: int) -> BufferShape:
"""Calculate an appropriate :class:`BufferShape` for the given
sample rate and block size
The :attr:`~BufferShape.total_samples` of the result will be chosen to
divide evenly with both the :attr:`~BufferShape.block_size` and
:attr:`~BufferShape.pad_size`, allowing for input and output views of the
same array through :func:`reshaping <numpy.reshape>`
"""
fs = Fraction(sample_rate, 1)
overlap = Fraction(3, 4)
step = 1 - overlap
step_samp = fs * step
assert step_samp % 1 == 0
gate_len = Fraction(4, 10)
pad_len = Fraction(1, 10)
assert (sample_rate * gate_len) % 1 == 0
assert (sample_rate * pad_len) % 1 == 0
gate_size = int(sample_rate * gate_len)
pad_size = int(sample_rate * pad_len)
bfr_len = math.lcm(pad_size, block_size)
while bfr_len <= gate_size:
bfr_len *= 2
assert bfr_len % 1 == 0
bfr_len = int(bfr_len)
assert bfr_len % block_size == 0
num_blocks = bfr_len // block_size
bfr_t = bfr_len / fs
x = (bfr_t - gate_len) / (gate_len * step)
num_gb = int(np.round(float(x)+1))
return BufferShape(
total_samples=bfr_len,
block_size=block_size,
num_blocks=num_blocks,
pad_size=pad_size,
gate_size=gate_size,
num_gate_blocks=num_gb,
gate_step_size=int(step_samp),
)
[docs]
class BaseSampler(ABC, Generic[NumChannelsT]):
sample_rate: Fraction
"""The sample rate of the input data"""
block_size: int
"""Sample length per call to :meth:`write`"""
num_channels: NumChannelsT
"""Number of channels"""
sample_array: Float2dArray
"""Flat array to store samples waiting to process"""
write_view: Float3dArray
"""View of :attr:`sample_array` with shape
``(num_channels, block_size, sample_array.shape[1] // block_size)``
"""
def __init__(self, block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000) -> None:
self.block_size = block_size
self.sample_rate = Fraction(sample_rate, 1)
self.num_channels = num_channels
self.bfr_shape = self._calc_buffer_shape()
bfr_len = self.bfr_shape.total_samples
self.sample_array = np.zeros(
(num_channels, bfr_len),
dtype=np.float64,
)
self.write_view = np.reshape(
self.sample_array,
(num_channels, self.num_blocks, self.block_size)
)
self.write_slice = Slice(self.block_size, max_index=self.num_blocks-1)
self.samples_available = 0
@property
def num_blocks(self) -> int:
"""Alias for :attr:`BufferShape.num_blocks`"""
return self.bfr_shape.num_blocks
@property
def total_samples(self) -> int:
"""Alias for :attr:`BufferShape.total_samples`"""
return self.bfr_shape.total_samples
@abstractmethod
def _calc_buffer_shape(self) -> BufferShape:
"""Calculate the :class:`BufferShape` for this sampler"""
raise NotImplementedError
[docs]
def write(self, samples: Float2dArray|Float2dArray32, apply_filter: bool = True) -> None:
"""Store input data into the internal buffer.
The input data must be of shape ``(num_channels, block_size)``
"""
assert samples.shape == (self.num_channels, self.block_size)
self._write(samples)
def _write(self, samples: Float2dArray|Float2dArray32) -> None:
sl = self.write_slice
self.write_view[:,sl.index,:] = samples
sl.index += 1
self.samples_available += samples.shape[1]
[docs]
def can_write(self) -> bool:
"""Whether there is enough room on the internal buffer for at least
one call to :meth:`write`
"""
return self.samples_available <= self.total_samples - self.block_size
[docs]
@abstractmethod
def read(self) -> Float2dArray:
"""Read samples from the internal buffer"""
raise NotImplementedError
[docs]
@abstractmethod
def can_read(self) -> bool:
"""Whether there are enough samples to read"""
raise NotImplementedError
[docs]
def clear(self) -> None:
"""Clear the internal buffer"""
self._clear()
def _clear(self) -> None:
self.samples_available = 0
self.write_slice.index = 0
[docs]
class Sampler(BaseSampler[NumChannelsT]):
"""Allows input data to be stored in chunks of a specified length
and read out in windowed segments as needed for :term:`gating block`
calculations.
"""
gate_view: Float2dArray
"""Sliding window view of :attr:`~BaseSampler.sample_array` with 75% overlap
and shape ``(num_channels, gate_size, sample_array.shape[1] // gate_size)``
"""
filter: FilterGroup[NumChannelsT]
"""A :class:`~.filters.FilterGroup` with both stages of the pre-filter
defined in :term:`BS 1770`
"""
def __init__(self, block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000) -> None:
super().__init__(block_size, num_channels, sample_rate)
self.gate_view = self.sample_array.view()
self.gate_slice = Slice(
step=self.gate_size,
overlap=self.pad_size,
max_index=self.num_gate_blocks,
)
coeff = [HS_COEFF, HP_COEFF]
if sample_rate != 48000:
coeff = [c.as_sample_rate(sample_rate) for c in coeff]
self.filter = FilterGroup(*coeff, num_channels=self.num_channels)
@property
def gate_size(self) -> int:
"""Length of one gated block in samples (400ms)"""
return self.bfr_shape.gate_size
@property
def pad_size(self) -> int:
"""Overlap amount per gated block in samples (100ms)"""
return self.bfr_shape.pad_size
@property
def num_gate_blocks(self) -> int:
"""Alias for :attr:`BufferShape.num_gate_blocks`"""
return self.bfr_shape.num_gate_blocks
def _calc_buffer_shape(self) -> BufferShape:
return calc_buffer_length(int(self.sample_rate), self.block_size)
[docs]
def write(self, samples: Float2dArray|Float2dArray32, apply_filter: bool = True) -> None:
"""Store input data into the internal buffer, optionally applying the
:attr:`pre-filter <filter>`
The input data must be of shape ``(num_channels, block_size)``
"""
assert samples.shape == (self.num_channels, self.block_size)
if apply_filter:
if not is_float64_array(samples):
samples = samples.astype(np.float64)
samples = self.filter(samples)
super().write(samples)
[docs]
def can_read(self) -> bool:
"""Whether there are enough samples in the internal buffer for at least
one call to :meth:`read`
"""
return self.samples_available >= self.gate_size
[docs]
def read(self) -> Float2dArray:
"""Get the samples for one :term:`gating block`
"""
return self._read()
def _read(self) -> Float2dArray:
sl = self.gate_slice
r: FloatArray = sl.slice(self.gate_view, axis=1)
sl.increment(self.gate_view, axis=1)
self.samples_available -= self.pad_size
return ensure_2d_array(r)
def _clear(self) -> None:
super()._clear()
self.gate_slice.index = 0
self.filter.reset()
[docs]
class TruePeakSampler(BaseSampler[NumChannelsT]):
"""A :class:`Sampler` subclass for use with true peak sampling
This sampler writes in the same way as :class:`Sampler`, but reads
are not overlapping.
The length of each read is determined by :attr:`gate_duration`.
"""
gate_view: Float3dArray
"""View of :attr:`~BaseSampler.sample_array` with shape
``(num_channels, num_gate_blocks, gate_size)``
"""
gate_duration: Fraction
"""Duration of each read in seconds. Default is 400ms.
The chosen duration must be divisible by the sample rate.
Shorter durations (e.g., 100ms) may be used for faster updates and *should*
not affect the accuracy of the true peak measurement (within reason).
The durations tested and confirmed to be accurate are: ``100ms, 200ms, 400ms, 800ms``.
"""
def __init__(
self,
block_size: int,
num_channels: NumChannelsT,
sample_rate: int = 48000,
gate_duration: Fraction = Fraction(4, 10)
) -> None:
self.gate_duration = gate_duration
super().__init__(block_size, num_channels, sample_rate)
self.gate_view = np.reshape(
self.sample_array,
(num_channels, self.num_gate_blocks, self.gate_size)
)
self.gate_slice = Slice(
step=self.gate_size,
overlap=0,
max_index=self.num_gate_blocks-1,
)
@property
def gate_size(self) -> int:
"""Length of each read in samples, depending on :attr:`gate_duration`"""
return self.bfr_shape.gate_size
@property
def num_gate_blocks(self) -> int:
"""Number of :attr:`gate_size` blocks that can be stored in the internal buffer"""
return self.bfr_shape.num_gate_blocks
def _calc_buffer_shape(self) -> BufferShape:
fs = self.sample_rate
gate_time = self.gate_duration
assert (fs * gate_time) % 1 == 0
gate_samples = int(fs * gate_time)
bfr_len = math.lcm(self.block_size, gate_samples)
if bfr_len == self.block_size:
bfr_len *= 2
assert bfr_len > self.block_size
num_blocks = bfr_len // self.block_size
assert bfr_len % gate_samples == 0
num_gate_blocks = bfr_len // gate_samples
return BufferShape(
total_samples=bfr_len,
block_size=self.block_size,
num_blocks=num_blocks,
pad_size=0, # No overlap for true peak sampling
gate_size=gate_samples,
num_gate_blocks=num_gate_blocks,
gate_step_size=gate_samples,
)
[docs]
def can_read(self) -> bool:
"""Whether there are enough samples in the internal buffer for at least
one call to :meth:`read`
"""
return self.samples_available >= self.gate_size
[docs]
def read(self) -> Float2dArray:
"""Get next available samples.
The result will be of shape ``(num_channels, gate_size)``.
"""
return self._read()
def _read(self) -> Float2dArray:
sl = self.gate_slice
r: FloatArray = self.gate_view[:, sl.index, :]
sl.index += 1
assert r.shape == (self.num_channels, self.gate_size)
self.samples_available -= self.gate_size
return ensure_2d_array(r)
def _clear(self) -> None:
super()._clear()
self.gate_slice.index = 0
[docs]
class LockContext:
"""A mixin for context manager support using a :class:`threading.RLock`
"""
_lock: threading.RLock
[docs]
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""Acquire the underlying lock
See :meth:`threading.Lock.acquire` for argument details
"""
return self._lock.acquire(blocking, timeout)
[docs]
def release(self) -> None:
"""Release the underlying lock
See :meth:`threading.Lock.release` for argument details
"""
self._lock.release()
def __enter__(self) -> Self:
self.acquire()
return self
def __exit__(self, *args) -> None:
self.release()
[docs]
class ThreadSafeSampler(Sampler[NumChannelsT], LockContext):
"""A :class:`Sampler` subclass for use with threaded reads and writes
"""
def __init__(self, block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000) -> None:
super().__init__(block_size, num_channels, sample_rate)
self._lock = threading.RLock()
def _write(self, samples: Float2dArray|Float2dArray32) -> None:
with self:
super()._write(samples)
def _read(self) -> Float2dArray:
with self:
return super()._read()
def _clear(self) -> None:
with self:
super()._clear()
[docs]
class ThreadSafeTruePeakSampler(TruePeakSampler[NumChannelsT], LockContext):
"""A :class:`TruePeakSampler` subclass for use with threaded reads and writes
"""
def __init__(
self,
block_size: int,
num_channels: NumChannelsT,
sample_rate: int = 48000,
gate_duration: Fraction = Fraction(4, 10)
) -> None:
super().__init__(block_size, num_channels, sample_rate, gate_duration)
self._lock = threading.RLock()
def _write(self, samples: Float2dArray|Float2dArray32) -> None:
with self:
super()._write(samples)
def _read(self) -> Float2dArray:
with self:
return super()._read()
def _clear(self) -> None:
with self:
super()._clear()