lupy.sampling¶
- class lupy.sampling.BufferShape(total_samples, block_size, num_blocks, pad_size, gate_size, num_gate_blocks, gate_step_size)[source]¶
Bases:
NamedTuple- pad_size: int¶
The padding (overlap) between each windowed gating block
- gate_size: int¶
Total length in samples of each gating block
- num_gate_blocks: int¶
Number of overlapping gating blocks that can be stored within
total_samples
- gate_step_size: int¶
The step size in samples between each overlapped gating block
- class lupy.sampling.Slice(step: int, max_index: int, index_: int = 0, overlap: int = 0)[source]¶
Bases:
objectHelper class to manage slicing of overlapping array chunks
This can be used to slice overlapping or non-overlapping chunks from an array, wrapping around the end of the array as needed.
For non-overlapping slices, set
overlapto zero.- Parameters:
Note
The naming of
stepandoverlapis somewhat counter-intuitive.steprefers to the length of each sliced chunk (what would typically be called “window size”), whileoverlaprefers to the number of elements to repeat between chunks (what would typically be called “step size”).Examples
Overlapping Slices:
>>> arr = np.arange(6) >>> sl = Slice(step=4, overlap=2, max_index=0) >>> sl.slice(arr, axis=0) # index 0 array([0, 1, 2, 3]) >>> sl.increment(arr, axis=0) # index 1 >>> sl.slice(arr, axis=0) array([2, 3, 4, 5]) >>> sl.increment(arr, axis=0) >>> sl.slice(arr, axis=0) # index 2 (wraps around) array([4, 5, 0, 1]) >>> sl.increment(arr, axis=0) >>> sl.slice(arr, axis=0) # index 0 array([0, 1, 2, 3])
Non-overlapping Slices:
>>> sl = Slice(step=3, overlap=0, max_index=1) >>> sl.slice(arr, axis=0) # index 0 array([0, 1, 2]) >>> sl.increment(arr, axis=0) >>> sl.slice(arr, axis=0) # index 1 array([3, 4, 5]) >>> sl.increment(arr, axis=0) >>> sl.slice(arr, axis=0) # index 0 (wraps around) array([0, 1, 2])
- overlap: int¶
Number of elements to repeat for each sliced array chunk (this would be better named “step”)
- max_index: int¶
Maximum
indexvalue before wrapping to zero whenoverlapis zero.Note
This has no effect when
overlapis non-zero, since the slice will wrap around the end of the array as needed regardless of the index value.
- increment(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) None[source]¶
Increment the slice to the next position, wrapping around the end of the array as needed
- is_wrapped(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) bool[source]¶
Whether the current slice wraps around the end of the array
- indices(arr_len: int) ndarray[ShapeT, dtype[int64]][source]¶
Get an index array for the current slice, wrapping around the end of the array as needed
- Parameters:
arr_len (int) – Length of the array being sliced
- calc_shape(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) tuple[int, ...][source]¶
Calculate the shape of the sliced array along the specified axis
- build_slice_array(x: ndarray[tuple[Any, ...], dtype[Any]], axis: int) tuple[slice | ndarray[ShapeT, dtype[int64]], ...][source]¶
Build a tuple of slices/indices for slicing the array along the specified axis
If the slice wraps around the end of the array, an index array will be used for that axis. Otherwise, a standard slice will be used.
- lupy.sampling.calc_buffer_length(sample_rate: int, block_size: int) BufferShape[source]¶
Calculate an appropriate
BufferShapefor the given sample rate and block sizeThe
total_samplesof the result will be chosen to divide evenly with both theblock_sizeandpad_size, allowing for input and output views of the same array throughreshaping
- class lupy.sampling.BaseSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000)[source]¶
Bases:
ABC,Generic[NumChannelsT]- num_channels: NumChannelsT¶
Number of channels
- sample_array: ndarray[tuple[int, int], dtype[float64]]¶
Flat array to store samples waiting to process
- write_view: ndarray[tuple[int, int, int], dtype[float64]]¶
View of
sample_arraywith shape(num_channels, block_size, sample_array.shape[1] // block_size)
- property num_blocks: int¶
Alias for
BufferShape.num_blocks
- property total_samples: int¶
Alias for
BufferShape.total_samples
- write(samples: ndarray[tuple[int, int], dtype[float64]] | ndarray[tuple[int, int], dtype[float32]], apply_filter: bool = True) None[source]¶
Store input data into the internal buffer.
The input data must be of shape
(num_channels, block_size)
- can_write() bool[source]¶
Whether there is enough room on the internal buffer for at least one call to
write()
- class lupy.sampling.Sampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000)[source]¶
Bases:
BaseSampler[NumChannelsT]Allows input data to be stored in chunks of a specified length and read out in windowed segments as needed for gating block calculations.
- gate_view: ndarray[tuple[int, int], dtype[float64]]¶
Sliding window view of
sample_arraywith 75% overlap and shape(num_channels, gate_size, sample_array.shape[1] // gate_size)
- filter: FilterGroup[NumChannelsT]¶
A
FilterGroupwith both stages of the pre-filter defined in BS 1770
- property num_gate_blocks: int¶
Alias for
BufferShape.num_gate_blocks
- write(samples: ndarray[tuple[int, int], dtype[float64]] | ndarray[tuple[int, int], dtype[float32]], apply_filter: bool = True) None[source]¶
Store input data into the internal buffer, optionally applying the
pre-filterThe input data must be of shape
(num_channels, block_size)
- class lupy.sampling.TruePeakSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000, gate_duration: Fraction = Fraction(2, 5))[source]¶
Bases:
BaseSampler[NumChannelsT]A
Samplersubclass for use with true peak samplingThis sampler writes in the same way as
Sampler, but reads are not overlapping.The length of each read is determined by
gate_duration.- gate_duration: Fraction¶
Duration of each read in seconds. Default is 400ms.
The chosen duration must be divisible by the sample rate. Shorter durations (e.g., 100ms) may be used for faster updates and should not affect the accuracy of the true peak measurement (within reason).
The durations tested and confirmed to be accurate are:
100ms, 200ms, 400ms, 800ms.
- gate_view: ndarray[tuple[int, int, int], dtype[float64]]¶
View of
sample_arraywith shape(num_channels, num_gate_blocks, gate_size)
- property gate_size: int¶
Length of each read in samples, depending on
gate_duration
- class lupy.sampling.LockContext[source]¶
Bases:
objectA mixin for context manager support using a
threading.RLock- acquire(blocking: bool = True, timeout: float = -1) bool[source]¶
Acquire the underlying lock
See
threading.Lock.acquire()for argument details
- release() None[source]¶
Release the underlying lock
See
threading.Lock.release()for argument details
- class lupy.sampling.ThreadSafeSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000)[source]¶
Bases:
Sampler[NumChannelsT],LockContextA
Samplersubclass for use with threaded reads and writes
- class lupy.sampling.ThreadSafeTruePeakSampler(block_size: int, num_channels: NumChannelsT, sample_rate: int = 48000, gate_duration: Fraction = Fraction(2, 5))[source]¶
Bases:
TruePeakSampler[NumChannelsT],LockContextA
TruePeakSamplersubclass for use with threaded reads and writes