破译 PCM-Encoded 的音频样本
这部分将变得稍微高级一些,但从长远来看,它将使在 Python 中处理 WAV 文件变得更加容易。
在本教程结束时,我们将构建出 waveio 包:
waveio/ │ ├── __init__.py ├── encoding.py ├── metadata.py ├── reader.py └── writer.py
encoding
模块将负责归一化幅度值和 PCM 编码样本之间的双向转换metadata
模块将表示 WAV 文件头reader
读取和解释音频帧writer
写入 WAV 文件
枚举编码格式
waveio/encoding.py
创建PCMEncoding
类继承枚举类IntEnum
,并实现max
, min
, num_bits
方法。
from enum import IntEnum class PCMEncoding(IntEnum): UNSIGNED_8 = 1 SIGNED_16 = 2 SIGNED_24 = 3 SIGNED_32 = 4 @property def max(self): return 255 if self == 1 else -self.min -1 @property def min(self): return 0 if self == 1 else -(2** (self.num_bits-1)) @property def num_bits(self): return self * 8
Docode 将音频帧转换为振幅
继续向 PCMEncoding
类添加一个新方法decode
,该方法将处理四种编码格式,将帧转换成(归一化的)振幅。
from enum import IntEnum import numpy as np class PCMEncoding(IntEnum): # ... def decode(self, frames): match self: case PCMEncoding.UNSIGNED_8: return np.frombuffer(frames, "u1") / self.max * 2 - 1 case PCMEncoding.SIGNED_16: # little-endin 2-byte signed integer return np.frombuffer(frames, "<i2") / -self.min case PCMEncoding.SIGNED_24: triplets = np.frombuffer(frames, "u1").reshape(-1, 3) padded = np.pad(triplets, ((0, 0), (0, 1)), mode="constant") samples = padded.flatten().view("<i4") samples[samples > self.max] += 2 * self.min return samples / -self.min case PCMEncoding.SIGNED_32: return np.frombuffer(frames, "<i4") / -self.min case _: raise TypeError("unsupported encoding")
Encode 将振幅编码为音频帧
添加.encoder()
方法,将振幅转换成帧。
from enum import IntEnum import numpy as np class PCMEncoding(IntEnum): # ... def _clamp(self, samples): return np.clip(samples, self.min, self.max) def encode(self, amplitudes): match self: case PCMEncoding.UNSIGNED_8: samples = np.round((amplitudes + 1) / 2 * self.max) return self._clamp(samples).astype("u1").tobytes() case PCMEncoding.SIGNED_16: samples = np.round(-self.min * amplitudes) return self._clamp(samples).astype("<i2").tobytes() case PCMEncoding.SIGNED_24: samples = np.round(-self.min * amplitudes) return ( self._clamp(samples) .astype("<i4") .view("u1") .reshape(-1, 4)[:, :3] .flatten() .tobytes() ) case PCMEncoding.SIGNED_32: samples = np.round(-self.min * amplitudes) return self._clamp(samples).astype("<i4").tobytes() case _: raise TypeError("unsupported encoding")
封装 WAV 文件的元数据
管理WAV文件的多个元数据可能很麻烦,因此我们自定义一个数据类,将它们分组在一个命名空间下。
waveio/metadata.py
from dataclasses import dataclass from waveio.encoding import PCMEncoding @dataclass(frozen=True) class WAVMetadata: encoding: PCMEncoding frames_per_second: float num_channels: int num_frames: int | None = None
考虑到人类认喜欢用秒表示声音持续时间,我们添加一个属性num_seconds
进行帧–>秒的转换:
@dataclass(frozen=True) class WAVMetadata: ... @property def num_seconds(self): if self.num_frames is None: raise ValueError("indeterminate stream of audio frames") return self.num_frames / self.frames_per_second
加载所有音频帧
使用原始的wave
读取wav
文件需要手动处理二进制数据,我们将创建reader
避免这一麻烦。
waveio/reader.py
import wave from waveio.encoding import PCMEncoding from waveio.metadata import WAVMetadata class WAVReader: def __init__(self, path): self._wav_file = wave.open(str(path)) self.metadata = WAVMetadata( PCMEncoding(self._wav_file.getsampwidth()), self._wav_file.getframerate(), self._wav_file.getnchannels(), self._wav_file.getnframes(), ) def __enter__(self): return self def __exit__(self, *args, **kwargs): self._wav_file.close()
对于较小的文件,可以直接加载到内存:
class WAVReader: # ... def _read(self, max_frames=None): self._wav_file.rewind() frames = self._wav_file.readframes(max_frames) return self.metadata.encoding.decode(frames)
readframes()会向前移动文件指针,rewind()会将指针重置在开头,确保每次读取都是从头开始读取。
但是,在处理音频信号时,通常需要将数据视为帧/通道序列,而不是单个幅度样本。幸运的是,根据您的需要,您可以快速将一维 NumPy 数组重塑为合适的二维帧或通道矩阵。我们将通过reshape
装饰器实现这一功能。
import wave from functools import cached_property from waveio.encoding import PCMEncoding from waveio.metadata import WAVMetadata class WAVReader: # ... @cached_property @reshape("rows") def frames(self): return self._read(self.metadata.num_frames) @cached_property @reshape("columns") def channels(self): return self.frames
reshape
装饰器的实现如下:
import wave from functools import cached_property, wraps from waveio.encoding import PCMEncoding from waveio.metadata import WAVMetadata def reshape(shape): if shape not in ("rows", "columns"): raise ValueError("shape must be either 'rows' or 'columns'") def decorator(method): @wraps(method) def wrapper(self, *args, **kwargs): values = method(self, *args, **kwargs) reshaped = values.reshape(-1, self.metadata.num_channels) return reshaped if shape == "rows" else reshaped.T return wrapper return decorator # ...
为了让WAVReader
在外部可用,我们在waveio.__init__.py
中暴漏WAVReader
类:
from waveio.reader import WAVReader __all__ = ["WAVReader"]
使用 Matplotlib 绘制静态波形
我们已经可以进行wav文件的读取了,一个很直接的应用是使用matplotlib绘制声音的波形。
plot_waveform.py
from argparse import ArgumentParser from pathlib import Path import matplotlib.pyplot as plt import numpy as np from matplotlib.ticker import FuncFormatter from waveio import WAVReader def main(): args = parse_args() with WAVReader(args.path) as wav: plot(args.path.name, wav.metadata, wav.channels) def parse_args(): parser = ArgumentParser(description="Plot the waveform of a WAV file") parser.add_argument("path", type=Path, help="path to the WAV file") return parser.parse_args() def plot(filename, metadata, channels): fig, ax = plt.subplots( nrows=metadata.num_channels, ncols=1, figsize=(16, 9), sharex=True, # 共享x轴 ) if isinstance(ax, plt.Axes): ax = [ax] time_formatter = FuncFormatter(format_time) timeline = np.linspace( start=0, stop=metadata.num_seconds, num=metadata.num_frames ) for i, channel in enumerate(channels): ax[i].set_title(f"Channel #{i + 1}") ax[i].set_yticks([-1, -0.5, 0, 0.5, 1]) ax[i].xaxis.set_major_formatter(time_formatter) ax[i].plot(timeline, channel) fig.canvas.manager.set_window_title(filename) plt.tight_layout() plt.show() def format_time(instant, _): if instant < 60: return f"{instant:g}s" minutes, seconds = divmod(instant, 60) return f"{minutes:g}m {seconds:02g}s" if __name__ == "__main__": main()
执行
python .\plot_waveform.py .\sounds\Bicycle-bell.wav
可以看到上面的波形图。
读取音频帧的切片
如果您有一个特别长的音频文件,则可以通过缩小感兴趣的音频帧的范围来减少加载和解码基础数据所需的时间。
我们将通过切片功能实现读取一个范围的音频。
首先在脚本参数中添加起始点(start)和结束点(end)这两个参数。
# ... def parse_args(): parser = ArgumentParser(description="Plot the waveform of a WAV file") parser.add_argument("path", type=Path, help="path to the WAV file") parser.add_argument( "-s", "--start", type=float, default=0.0, help="start time in seconds (default: 0.0)", ) parser.add_argument( "-e", "--end", type=float, default=None, help="end time in seconds (default: end of file)", ) return parser.parse_args() def main(): args = parse_args() with WAVReader(args.path) as wav: plot( args.path.name, wav.metadata, wav.channels_sliced(args.start, args.end) ) # ...
在plot
中,时间轴不再从0开始,需要和切片时间匹配:
# ... def plot(filename, metadata, channels): # ... time_formatter = FuncFormatter(format_time) timeline = np.linspace( channels.frames_range.start / metadata.frames_per_second, channels.frames_range.stop / metadata.frames_per_second, len(channels.frames_range) )
然后我们需要更新reader.py
文件,读取音频的任意切片:
# ... class WAVReader: # ... @cached_property @reshape("rows") def frames(self): return self._read(self.metadata.num_frames, start_frame=0) # ... def _read(self, max_frames=None, start_frame=None): if start_frame is not None: self._wav_file.setpos(start_frame) # 设置起始位置 frames = self._wav_file.readframes(max_frames) return self.metadata.encoding.decode(frames) @reshape("columns") def channels_sliced(self, start_seconds=0.0, end_seconds=None): if end_seconds is None: end_seconds = self.metadata.num_seconds frames_slice = slice( round(self.metadata.frames_per_second * start_seconds), round(self.metadata.frames_per_second * end_seconds) ) frames_range = range(*frames_slice.indices(self.metadata.num_frames)) values = self._read(len(frames_range), frames_range.start) return ArraySlice(values, frames_range)
我们借助了ArraySlice
包装切片,包装了numpy array
并且公开了便于绘制时间线的.frames_rage
属性。
在reader.py
中添加ArraySlice
的定义:
# ... class ArraySlice: def __init__(self, values, frames_range): self.values = values self.frames_range = frames_range def __iter__(self): return iter(self.values) def __getattr__(self, name): return getattr(self.values, name) def reshape(self, *args, **kwargs): reshaped = self.values.reshape(*args, **kwargs) return ArraySlice(reshaped, self.frames_range) @property def T(self): return ArraySlice(self.values.T, self.frames_range) # ...
python plot_waveform.py Bongo_sound.wav --start 3.5 --end 3.65