lupy.sampling

class lupy.sampling.BufferShape(total_samples, block_size, num_blocks, pad_size, gate_size, num_gate_blocks, gate_step_size)[source]

Bases: NamedTuple

total_samples: int

Total number of samples for the buffer

block_size: int

The input block size

num_blocks: int

Number of blocks (total_samples // block_size)

pad_size: int

The padding (overlap) between each windowed gating block

gate_size: int

Total length in samples of each gating block

num_gate_blocks: int

Number of overlapping gating blocks that can be stored within total_samples

gate_step_size: int

The step size in samples between each overlapped gating block

class lupy.sampling.Slice(step: int, max_index: int, index_: int = 0, overlap: int = 0)[source]

Bases: object

Helper class to manage slicing of overlapping array chunks

This can be used to slice overlapping or non-overlapping chunks from an array, wrapping around the end of the array as needed.

For non-overlapping slices, set overlap to zero.

Parameters:
  • step (int) – Length of each sliced array chunk

  • overlap (int) – Number of elements to repeat for each sliced array chunk

  • max_index (int) – Maximum index value before wrapping to zero

  • index – Initial index value (default 0)

Note

The naming of step and overlap is somewhat counter-intuitive. step refers to the length of each sliced chunk (what would typically be called “window size”), while overlap refers to the number of elements to repeat between chunks (what would typically be called “step size”).

Examples

Overlapping Slices:

>>> arr = np.arange(6)
>>> sl = Slice(step=4, overlap=2, max_index=0)
>>> sl.slice(arr, axis=0)       # index 0
array([0, 1, 2, 3])
>>> sl.increment(arr, axis=0)   # index 1
>>> sl.slice(arr, axis=0)
array([2, 3, 4, 5])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0)       # index 2 (wraps around)
array([4, 5, 0, 1])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0)       # index 0
array([0, 1, 2, 3])

Non-overlapping Slices:

>>> sl = Slice(step=3, overlap=0, max_index=1)
>>> sl.slice(arr, axis=0)       # index 0
array([0, 1, 2])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0)       # index 1
array([3, 4, 5])
>>> sl.increment(arr, axis=0)
>>> sl.slice(arr, axis=0)       # index 0 (wraps around)
array([0, 1, 2])
step: int

Length of each sliced array chunk (this would be better named “win_size”)

overlap: int

Number of elements to repeat for each sliced array chunk (this would be better named “step”)

max_index: int

Maximum index value before wrapping to zero when overlap is zero.

Note

This has no effect when overlap is non-zero, since the slice will wrap around the end of the array as needed regardless of the index value.

property index: int

The current index value

property start_index: int

The starting index of the current slice

property end_index: int

The ending index of the current slice

increment(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) None[source]

Increment the slice to the next position, wrapping around the end of the array as needed

Parameters:
is_wrapped(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) bool[source]

Whether the current slice wraps around the end of the array

Parameters:
indices(arr_len: int) ndarray[ShapeT, dtype[int64]][source]

Get an index array for the current slice, wrapping around the end of the array as needed

Parameters:

arr_len (int) – Length of the array being sliced

calc_shape(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) tuple[int, ...][source]

Calculate the shape of the sliced array along the specified axis

Parameters:
build_slice_array(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) tuple[slice | ndarray[ShapeT, dtype[int64]], ...][source]

Build a tuple of slices/indices for slicing the array along the specified axis

If the slice wraps around the end of the array, an index array will be used for that axis. Otherwise, a standard slice will be used.

Parameters:
slice(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) ndarray[tuple[Any, ...], dtype[Any]][source]

Get the current slice of the array along the specified axis

Parameters:
lupy.sampling.calc_buffer_length(sample_rate: int, block_size: int) BufferShape[source]

Calculate an appropriate BufferShape for the given sample rate and block size

The total_samples of the result will be chosen to divide evenly with both the block_size and pad_size, allowing for input and output views of the same array through reshaping

class lupy.sampling.BaseSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000)[source]

Bases: ABC, Generic[NumChannelsT]

block_size: int

Sample length per call to write()

sample_rate: Fraction

The sample rate of the input data

num_channels: NumChannelsT

Number of channels

sample_array: ndarray[tuple[int, int], dtype[float64]]

Flat array to store samples waiting to process

write_view: ndarray[tuple[int, int, int], dtype[float64]]

View of sample_array with shape (num_channels, block_size, sample_array.shape[1] // block_size)

property num_blocks: int

Alias for BufferShape.num_blocks

property total_samples: int

Alias for BufferShape.total_samples

write(samples: ndarray[tuple[int, int], dtype[float64]] | ndarray[tuple[int, int], dtype[float32]], apply_filter: bool = True) None[source]

Store input data into the internal buffer.

The input data must be of shape (num_channels, block_size)

can_write() bool[source]

Whether there is enough room on the internal buffer for at least one call to write()

abstractmethod read() ndarray[tuple[int, int], dtype[float64]][source]

Read samples from the internal buffer

abstractmethod can_read() bool[source]

Whether there are enough samples to read

clear() None[source]

Clear the internal buffer

class lupy.sampling.Sampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000)[source]

Bases: BaseSampler[NumChannelsT]

Allows input data to be stored in chunks of a specified length and read out in windowed segments as needed for gating block calculations.

gate_view: ndarray[tuple[int, int], dtype[float64]]

Sliding window view of sample_array with 75% overlap and shape (num_channels, gate_size, sample_array.shape[1] // gate_size)

filter: FilterGroup[NumChannelsT]

A FilterGroup with both stages of the pre-filter defined in BS 1770

property gate_size: int

Length of one gated block in samples (400ms)

property pad_size: int

Overlap amount per gated block in samples (100ms)

property num_gate_blocks: int

Alias for BufferShape.num_gate_blocks

write(samples: ndarray[tuple[int, int], dtype[float64]] | ndarray[tuple[int, int], dtype[float32]], apply_filter: bool = True) None[source]

Store input data into the internal buffer, optionally applying the pre-filter

The input data must be of shape (num_channels, block_size)

can_read() bool[source]

Whether there are enough samples in the internal buffer for at least one call to read()

read() ndarray[tuple[int, int], dtype[float64]][source]

Get the samples for one gating block

class lupy.sampling.TruePeakSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000, gate_duration: Fraction = Fraction(2, 5))[source]

Bases: BaseSampler[NumChannelsT]

A Sampler subclass for use with true peak sampling

This sampler writes in the same way as Sampler, but reads are not overlapping.

The length of each read is determined by gate_duration.

gate_duration: Fraction

Duration of each read in seconds. Default is 400ms.

The chosen duration must be divisible by the sample rate. Shorter durations (e.g., 100ms) may be used for faster updates and should not affect the accuracy of the true peak measurement (within reason).

The durations tested and confirmed to be accurate are: 100ms, 200ms, 400ms, 800ms.

gate_view: ndarray[tuple[int, int, int], dtype[float64]]

View of sample_array with shape (num_channels, num_gate_blocks, gate_size)

property gate_size: int

Length of each read in samples, depending on gate_duration

property num_gate_blocks: int

Number of gate_size blocks that can be stored in the internal buffer

can_read() bool[source]

Whether there are enough samples in the internal buffer for at least one call to read()

read() ndarray[tuple[int, int], dtype[float64]][source]

Get next available samples.

The result will be of shape (num_channels, gate_size).

class lupy.sampling.LockContext[source]

Bases: object

A mixin for context manager support using a threading.RLock

acquire(blocking: bool = True, timeout: float = -1) bool[source]

Acquire the underlying lock

See threading.Lock.acquire() for argument details

release() None[source]

Release the underlying lock

See threading.Lock.release() for argument details

class lupy.sampling.ThreadSafeSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000)[source]

Bases: Sampler[NumChannelsT], LockContext

A Sampler subclass for use with threaded reads and writes

class lupy.sampling.ThreadSafeTruePeakSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000, gate_duration: Fraction = Fraction(2, 5))[source]

Bases: TruePeakSampler[NumChannelsT], LockContext

A TruePeakSampler subclass for use with threaded reads and writes