Source code for lupy.signalutils.resample

from __future__ import annotations
from typing import Generic, NamedTuple, cast
import math

import numpy as np
from scipy.signal import firwin
from scipy.signal._upfirdn_apply import _output_len, _apply, mode_enum

from ..types import Float1dArray, Float2dArray, NumChannelsT
from ..typeutils import ensure_1d_array, ensure_2d_array



[docs] class ResamplePolyParams(NamedTuple): """Parameters for polyphase resampling """ up: int """Upsampling factor""" down: int """Downsampling factor""" n_in: int """Number of input samples""" n_out: int """Number of output samples""" h: Float1dArray """The padded FIR filter window""" result_slice: tuple[slice, ...] """Slice object to extract the valid output samples from the polyphase upfirdn filtering step."""
# Adapted from: # https://github.com/scipy/scipy/blob/87c46641a8b3b5b47b81de44c07b840468f7ebe7/scipy/signal/_signaltools.py#L3363-L3384 #
[docs] def calc_tp_fir_win(upsample_factor: int) -> Float1dArray: """Calculate an appropriate low-pass FIR filter for over-sampling The method matches what is done in :func:`scipy.signal.resample_poly`, but with the following adjustments: - The downsampling factor is fixed as 1 (no downsampling) since only upsampling is required for :term:`True Peak` detection. - The FIR filter length is ``8 * upsample_factor + 1`` instead of ``10 * upsample_factor + 1``. This is a minimal length that still performs well for true peak detection, but is much more efficient than the default length used by :func:`scipy.signal.resample_poly`. """ max_rate = upsample_factor f_c = 1 / max_rate numtaps = 8 * max_rate + 1 # Casting to str because firwin's type hints are not compatible with the # implementation. window = cast(str, ('kaiser', 5.0)) h = firwin( numtaps, # len == 33 with upsample factor of 4 f_c, window=window ) assert isinstance(h, np.ndarray) h = h.astype(np.float64) return ensure_1d_array(h)
# Adapted from: # https://github.com/scipy/scipy/blob/e29dcb65a2040f04819b426a04b60d44a8f69c04/scipy/signal/_signaltools.py#L3547-L3759 #
[docs] def calc_resample_poly_params( up: int, down: int, n_samples: int, window: Float1dArray ) -> ResamplePolyParams: """Calculate parameters for polyphase resampling The parameters match those of :func:`scipy.signal.resample_poly` but assume that the window is already provided. Also assumes 2D input with the filter applied along the last axis. """ g_ = math.gcd(up, down) up //= g_ down //= g_ n_in = n_samples n_out = n_in * up n_out = n_out // down + bool(n_out % down) assert window.ndim == 1 half_len = (window.size - 1) // 2 h = window * up n_pre_pad = (down - half_len % down) n_post_pad = 0 n_pre_remove = (half_len + n_pre_pad) // down while _output_len(len(h) + n_pre_pad + n_post_pad, n_in, up, down) < n_out + n_pre_remove: n_post_pad += 1 h = np.concatenate((np.zeros(n_pre_pad, dtype=h.dtype), h, np.zeros(n_post_pad, dtype=h.dtype))) h = ensure_1d_array(h) n_pre_remove_end = n_pre_remove + n_out result_slice = np.s_[:, n_pre_remove:n_pre_remove_end] return ResamplePolyParams( up, down, n_in, n_out, h, result_slice, )
[docs] class ResamplePoly(Generic[NumChannelsT]): """Polyphase resampler using FIR window Precomputes parameters for efficient repeated resampling. Arguments: up: Upsampling factor down: Downsampling factor num_channels: Number of channels window: FIR filter window num_input_samples: Number of input samples. If not specified, defaults to 1024. """ DEFAULT_INPUT_SAMPLES = 1024 def __init__( self, up: int, down: int, num_channels: NumChannelsT, window: Float1dArray, num_input_samples: int = DEFAULT_INPUT_SAMPLES, ) -> None: self._up = up self._down = down self._num_channels = num_channels self._num_input_samples = num_input_samples self._window = window self.params = calc_resample_poly_params( up, down, num_input_samples, window, ) # Create the _UpFIRDn instance as used in scipy: # https://github.com/scipy/scipy/blob/e29dcb65a2040f04819b426a04b60d44a8f69c04/scipy/signal/_upfirdn.py#L214-L216 # # instead of discarding it after each call to resample_poly # (which is quite wasteful). self._upfirdn = _UpFIRDn( h=self.params.h, up=self.params.up, down=self.params.down, input_shape=self.input_shape, ) @property def num_channels(self) -> NumChannelsT: """Number of channels""" return self._num_channels @property def num_input_samples(self) -> int: """Number of input samples per call to :meth:`apply` If this is changed, internal parameters are recalculated. """ return self._num_input_samples @num_input_samples.setter def num_input_samples(self, value: int) -> None: if value == self._num_input_samples: return self._num_input_samples = value self.params = calc_resample_poly_params( up=self._up, down=self._down, n_samples=value, window=self._window, ) self._upfirdn = _UpFIRDn( h=self.params.h, up=self.params.up, down=self.params.down, input_shape=self.input_shape, ) @property def num_output_samples(self) -> int: """Number of output samples""" return self.params.n_out @property def input_shape(self) -> tuple[NumChannelsT, int]: """Expected input shape for :meth:`apply`""" return (self.num_channels, self.num_input_samples) @property def output_shape(self) -> tuple[NumChannelsT, int]: """Output shape for :meth:`apply`""" return (self.num_channels, self.num_output_samples)
[docs] def apply(self, x: Float2dArray) -> Float2dArray: """Resample the input array using polyphase filtering The input array must be 2D with shape ``(num_channels, num_input_samples)``. If the number of input samples differs from :attr:`num_input_samples`, internal parameters are recalculated. """ num_samples = x.shape[-1] if num_samples != self._num_input_samples: self.num_input_samples = num_samples y = self._upfirdn.apply_filter(x) r = y[self.params.result_slice] return ensure_2d_array(r)
# Adapted from: # https://github.com/scipy/scipy/blob/e29dcb65a2040f04819b426a04b60d44a8f69c04/scipy/signal/_upfirdn.py#L46-L63 # def _pad_h(h: Float1dArray, up: int) -> Float1dArray: """Store coefficients in a transposed, flipped arrangement. For example, suppose upRate is 3, and the input number of coefficients is 10, represented as h[0], ..., h[9]. Then the internal buffer will look like this:: h[9], h[6], h[3], h[0], // flipped phase 0 coefs 0, h[7], h[4], h[1], // flipped phase 1 coefs (zero-padded) 0, h[8], h[5], h[2], // flipped phase 2 coefs (zero-padded) """ h_padlen = len(h) + (-len(h) % up) h_full = np.zeros(h_padlen, h.dtype) h_full[:len(h)] = h h_full = h_full.reshape(-1, up).T[:, ::-1].ravel() return h_full # Adapted from: # https://github.com/scipy/scipy/blob/e29dcb65a2040f04819b426a04b60d44a8f69c04/scipy/signal/_upfirdn.py#L72-L104 #
[docs] class _UpFIRDn(Generic[NumChannelsT]): """Helper for resampling.""" mode = mode_enum('constant') dtype = np.dtype(np.float64) def __init__( self, h: Float1dArray, up: int, down: int, input_shape: tuple[NumChannelsT, int] ) -> None: if h.ndim != 1 or h.size == 0: raise ValueError('h must be 1-D with non-zero length') self._up = int(up) self._down = int(down) self._axis = 1 self._input_shape = input_shape assert len(self._input_shape) == 2 self._input_len = int(self._input_shape[self._axis]) if self._up < 1 or self._down < 1: raise ValueError('Both up and down must be >= 1') # This both transposes, and "flips" each phase for filtering self._h_trans_flip = _pad_h(h, self._up) self._h_trans_flip = np.ascontiguousarray(self._h_trans_flip) self._h_len_orig = len(h) output_len = _output_len( self._h_len_orig, self._input_len, self._up, self._down ) self._output_len = int(output_len) self._output_shape = ( self._input_shape[0], self._output_len ) self._out_array = np.zeros(self._output_shape, dtype=self.dtype, order='C')
[docs] def apply_filter(self, x: Float2dArray) -> Float2dArray: """Apply the prepared filter to the specified axis of N-D signal x.""" out = self._out_array out[...] = 0 if x.dtype != self.dtype: raise ValueError(f'Input array must be of dtype {self.dtype}') _apply(x, self._h_trans_flip, out, self._up, self._down, self._axis, self.mode, 0) return out