|
""" |
|
Processor class for VibeVoice models. |
|
""" |
|
|
|
import os |
|
import json |
|
import warnings |
|
from typing import List, Optional, Union, Dict, Any |
|
|
|
import numpy as np |
|
import torch |
|
|
|
from transformers.feature_extraction_utils import FeatureExtractionMixin |
|
from transformers.utils import logging |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
class AudioNormalizer: |
|
""" |
|
Audio normalization class for VibeVoice tokenizer. |
|
|
|
This class provides audio normalization to ensure consistent input levels |
|
for the VibeVoice tokenizer while maintaining audio quality. |
|
""" |
|
|
|
def __init__(self, target_dB_FS: float = -25, eps: float = 1e-6): |
|
""" |
|
Initialize the audio normalizer. |
|
|
|
Args: |
|
target_dB_FS (float): Target dB FS level for the audio. Default: -25 |
|
eps (float): Small value to avoid division by zero. Default: 1e-6 |
|
""" |
|
self.target_dB_FS = target_dB_FS |
|
self.eps = eps |
|
|
|
def tailor_dB_FS(self, audio: np.ndarray) -> tuple: |
|
""" |
|
Adjust the audio to the target dB FS level. |
|
|
|
Args: |
|
audio (np.ndarray): Input audio signal |
|
|
|
Returns: |
|
tuple: (normalized_audio, rms, scalar) |
|
""" |
|
rms = np.sqrt(np.mean(audio**2)) |
|
scalar = 10 ** (self.target_dB_FS / 20) / (rms + self.eps) |
|
normalized_audio = audio * scalar |
|
return normalized_audio, rms, scalar |
|
|
|
def avoid_clipping(self, audio: np.ndarray, scalar: Optional[float] = None) -> tuple: |
|
""" |
|
Avoid clipping by scaling down if necessary. |
|
|
|
Args: |
|
audio (np.ndarray): Input audio signal |
|
scalar (float, optional): Explicit scaling factor |
|
|
|
Returns: |
|
tuple: (normalized_audio, scalar) |
|
""" |
|
if scalar is None: |
|
max_val = np.max(np.abs(audio)) |
|
if max_val > 1.0: |
|
scalar = max_val + self.eps |
|
else: |
|
scalar = 1.0 |
|
|
|
return audio / scalar, scalar |
|
|
|
def __call__(self, audio: np.ndarray) -> np.ndarray: |
|
""" |
|
Normalize the audio by adjusting to target dB FS and avoiding clipping. |
|
|
|
Args: |
|
audio (np.ndarray): Input audio signal |
|
|
|
Returns: |
|
np.ndarray: Normalized audio signal |
|
""" |
|
|
|
audio, _, _ = self.tailor_dB_FS(audio) |
|
|
|
audio, _ = self.avoid_clipping(audio) |
|
return audio |
|
|
|
|
|
|
|
class VibeVoiceTokenizerProcessor(FeatureExtractionMixin): |
|
""" |
|
Processor for VibeVoice acoustic tokenizer models. |
|
|
|
This processor handles audio preprocessing for VibeVoice models, including: |
|
- Audio format conversion (stereo to mono) |
|
- Optional audio normalization |
|
- Streaming support for infinite-length audio |
|
|
|
Args: |
|
sampling_rate (int, optional): Expected sampling rate. Defaults to 24000. |
|
normalize_audio (bool, optional): Whether to normalize audio. Defaults to True. |
|
target_dB_FS (float, optional): Target dB FS for normalization. Defaults to -25. |
|
eps (float, optional): Small value for numerical stability. Defaults to 1e-6. |
|
""" |
|
model_input_names = ["input_features"] |
|
|
|
def __init__( |
|
self, |
|
sampling_rate: int = 24000, |
|
normalize_audio: bool = True, |
|
target_dB_FS: float = -25, |
|
eps: float = 1e-6, |
|
**kwargs, |
|
): |
|
super().__init__(**kwargs) |
|
|
|
self.sampling_rate = sampling_rate |
|
self.normalize_audio = normalize_audio |
|
|
|
|
|
if self.normalize_audio: |
|
self.normalizer = AudioNormalizer(target_dB_FS=target_dB_FS, eps=eps) |
|
else: |
|
self.normalizer = None |
|
|
|
|
|
self.feature_extractor_dict = { |
|
"sampling_rate": sampling_rate, |
|
"normalize_audio": normalize_audio, |
|
"target_dB_FS": target_dB_FS, |
|
"eps": eps, |
|
} |
|
|
|
def _ensure_mono(self, audio: np.ndarray) -> np.ndarray: |
|
""" |
|
Convert stereo audio to mono if needed. |
|
|
|
Args: |
|
audio (np.ndarray): Input audio array |
|
|
|
Returns: |
|
np.ndarray: Mono audio array |
|
""" |
|
if len(audio.shape) == 1: |
|
return audio |
|
elif len(audio.shape) == 2: |
|
if audio.shape[0] == 2: |
|
return np.mean(audio, axis=0) |
|
elif audio.shape[1] == 2: |
|
return np.mean(audio, axis=1) |
|
else: |
|
|
|
if audio.shape[0] == 1: |
|
return audio.squeeze(0) |
|
elif audio.shape[1] == 1: |
|
return audio.squeeze(1) |
|
else: |
|
raise ValueError(f"Unexpected audio shape: {audio.shape}") |
|
else: |
|
raise ValueError(f"Audio should be 1D or 2D, got shape: {audio.shape}") |
|
|
|
def _process_single_audio(self, audio: Union[np.ndarray, List[float]]) -> np.ndarray: |
|
""" |
|
Process a single audio array. |
|
|
|
Args: |
|
audio: Single audio input |
|
|
|
Returns: |
|
np.ndarray: Processed audio |
|
""" |
|
|
|
if not isinstance(audio, np.ndarray): |
|
audio = np.array(audio, dtype=np.float32) |
|
else: |
|
audio = audio.astype(np.float32) |
|
|
|
|
|
audio = self._ensure_mono(audio) |
|
|
|
|
|
if self.normalize_audio and self.normalizer is not None: |
|
audio = self.normalizer(audio) |
|
|
|
return audio |
|
|
|
def __call__( |
|
self, |
|
audio: Union[str, np.ndarray, List[float], List[np.ndarray], List[List[float]], List[str]] = None, |
|
sampling_rate: Optional[int] = None, |
|
return_tensors: Optional[str] = None, |
|
**kwargs, |
|
): |
|
""" |
|
Process audio for VibeVoice models. |
|
|
|
Args: |
|
audio: Audio input(s) to process. Can be: |
|
- str: Path to audio file |
|
- np.ndarray: Audio array |
|
- List[float]: Audio as list of floats |
|
- List[np.ndarray]: Batch of audio arrays |
|
- List[str]: Batch of audio file paths |
|
sampling_rate (int, optional): Sampling rate of the input audio |
|
return_tensors (str, optional): Return format ('pt' for PyTorch, 'np' for NumPy) |
|
|
|
Returns: |
|
dict: Processed audio inputs with keys: |
|
- input_features: Audio tensor(s) ready for the model |
|
""" |
|
if audio is None: |
|
raise ValueError("Audio input is required") |
|
|
|
|
|
if sampling_rate is not None and sampling_rate != self.sampling_rate: |
|
logger.warning( |
|
f"Input sampling rate ({sampling_rate}) differs from expected " |
|
f"sampling rate ({self.sampling_rate}). Please resample your audio." |
|
) |
|
|
|
|
|
if isinstance(audio, str): |
|
|
|
audio = self._load_audio_from_path(audio) |
|
is_batched = False |
|
elif isinstance(audio, list): |
|
if len(audio) == 0: |
|
raise ValueError("Empty audio list provided") |
|
|
|
|
|
if all(isinstance(item, str) for item in audio): |
|
|
|
audio = [self._load_audio_from_path(path) for path in audio] |
|
is_batched = True |
|
else: |
|
|
|
is_batched = isinstance(audio[0], (np.ndarray, list)) |
|
else: |
|
|
|
is_batched = False |
|
|
|
|
|
if is_batched: |
|
processed_audio = [self._process_single_audio(a) for a in audio] |
|
else: |
|
processed_audio = [self._process_single_audio(audio)] |
|
|
|
|
|
if return_tensors == "pt": |
|
if len(processed_audio) == 1: |
|
|
|
input_features = torch.from_numpy(processed_audio[0]).unsqueeze(0).unsqueeze(1) |
|
else: |
|
|
|
input_features = torch.stack([torch.from_numpy(a) for a in processed_audio]).unsqueeze(1) |
|
elif return_tensors == "np": |
|
if len(processed_audio) == 1: |
|
input_features = processed_audio[0][np.newaxis, np.newaxis, :] |
|
else: |
|
input_features = np.stack(processed_audio)[:, np.newaxis, :] |
|
else: |
|
input_features = processed_audio[0] if len(processed_audio) == 1 else processed_audio |
|
|
|
outputs = { |
|
"audio": input_features, |
|
} |
|
|
|
return outputs |
|
|
|
def _load_audio_from_path(self, audio_path: str) -> np.ndarray: |
|
""" |
|
Load audio from file path. |
|
|
|
Args: |
|
audio_path (str): Path to audio file |
|
|
|
Returns: |
|
np.ndarray: Loaded audio array |
|
""" |
|
|
|
file_ext = os.path.splitext(audio_path)[1].lower() |
|
|
|
if file_ext in ['.wav', '.mp3', '.flac', '.m4a', '.ogg']: |
|
|
|
import librosa |
|
audio_array, sr = librosa.load( |
|
audio_path, |
|
sr=self.sampling_rate, |
|
mono=True |
|
) |
|
return audio_array |
|
elif file_ext == '.pt': |
|
|
|
audio_tensor = torch.load(audio_path, map_location='cpu').squeeze() |
|
if isinstance(audio_tensor, torch.Tensor): |
|
audio_array = audio_tensor.numpy() |
|
else: |
|
audio_array = np.array(audio_tensor) |
|
return audio_array.astype(np.float32) |
|
elif file_ext == '.npy': |
|
|
|
audio_array = np.load(audio_path) |
|
return audio_array.astype(np.float32) |
|
else: |
|
raise ValueError( |
|
f"Unsupported file format: {file_ext}. " |
|
f"Supported formats: .wav, .mp3, .flac, .m4a, .ogg, .pt, .npy, .npz" |
|
) |
|
|
|
def preprocess_audio( |
|
self, |
|
audio_path_or_array: Union[str, np.ndarray], |
|
normalize: Optional[bool] = None, |
|
) -> np.ndarray: |
|
""" |
|
Convenience method to preprocess audio from file path or array. |
|
This method is kept for backward compatibility but __call__ is recommended. |
|
|
|
Args: |
|
audio_path_or_array: Path to audio file or numpy array |
|
normalize: Whether to normalize (overrides default setting) |
|
|
|
Returns: |
|
np.ndarray: Preprocessed audio array |
|
""" |
|
if isinstance(audio_path_or_array, str): |
|
audio_array = self._load_audio_from_path(audio_path_or_array) |
|
else: |
|
audio_array = np.array(audio_path_or_array, dtype=np.float32) |
|
|
|
|
|
original_normalize = self.normalize_audio |
|
if normalize is not None: |
|
self.normalize_audio = normalize |
|
|
|
try: |
|
processed = self._process_single_audio(audio_array) |
|
finally: |
|
|
|
self.normalize_audio = original_normalize |
|
|
|
return processed |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
""" |
|
Convert the object to a dict containing all attributes needed for serialization. |
|
""" |
|
return self.feature_extractor_dict |
|
|
|
def save_audio( |
|
self, |
|
audio: Union[torch.Tensor, np.ndarray, List[Union[torch.Tensor, np.ndarray]]], |
|
output_path: str = "output.wav", |
|
sampling_rate: Optional[int] = None, |
|
normalize: bool = False, |
|
batch_prefix: str = "audio_", |
|
): |
|
""" |
|
Save audio data to WAV file(s). |
|
|
|
Args: |
|
audio: Audio data to save. Can be: |
|
- torch.Tensor: PyTorch tensor with shape (B, C, T) or (B, T) or (T) |
|
- np.ndarray: NumPy array with shape (B, C, T) or (B, T) or (T) |
|
- List of tensors or arrays |
|
output_path: Path where to save the audio. If saving multiple files, |
|
this is treated as a directory and individual files will be saved inside. |
|
sampling_rate: Sampling rate for the saved audio. Defaults to the processor's rate. |
|
normalize: Whether to normalize audio before saving. |
|
batch_prefix: Prefix for batch files when saving multiple audios. |
|
|
|
Returns: |
|
List[str]: Paths to the saved audio files. |
|
""" |
|
if sampling_rate is None: |
|
sampling_rate = self.sampling_rate |
|
|
|
try: |
|
import soundfile as sf |
|
except ImportError: |
|
raise ImportError( |
|
"soundfile is required to save audio files. " |
|
"Install it with: pip install soundfile" |
|
) |
|
|
|
|
|
if isinstance(audio, torch.Tensor): |
|
|
|
audio_np = audio.float().detach().cpu().numpy() |
|
elif isinstance(audio, np.ndarray): |
|
audio_np = audio |
|
elif isinstance(audio, list): |
|
|
|
if all(isinstance(a, torch.Tensor) for a in audio): |
|
audio_np = [a.float().detach().cpu().numpy() for a in audio] |
|
else: |
|
audio_np = audio |
|
else: |
|
raise ValueError(f"Unsupported audio type: {type(audio)}") |
|
|
|
saved_paths = [] |
|
|
|
|
|
if isinstance(audio_np, list): |
|
|
|
output_dir = output_path |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
for i, audio_item in enumerate(audio_np): |
|
audio_item = self._prepare_audio_for_save(audio_item, normalize) |
|
file_path = os.path.join(output_dir, f"{batch_prefix}{i}.wav") |
|
sf.write(file_path, audio_item, sampling_rate) |
|
saved_paths.append(file_path) |
|
|
|
else: |
|
|
|
if len(audio_np.shape) >= 3: |
|
|
|
batch_size = audio_np.shape[0] |
|
|
|
if batch_size > 1: |
|
|
|
output_dir = output_path |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
for i in range(batch_size): |
|
|
|
single_audio = audio_np[i] |
|
if len(single_audio.shape) > 1: |
|
if single_audio.shape[0] == 1: |
|
single_audio = single_audio.squeeze(0) |
|
|
|
single_audio = self._prepare_audio_for_save(single_audio, normalize) |
|
file_path = os.path.join(output_dir, f"{batch_prefix}{i}.wav") |
|
sf.write(file_path, single_audio, sampling_rate) |
|
saved_paths.append(file_path) |
|
else: |
|
|
|
audio_item = audio_np.squeeze() |
|
audio_item = self._prepare_audio_for_save(audio_item, normalize) |
|
sf.write(output_path, audio_item, sampling_rate) |
|
saved_paths.append(output_path) |
|
else: |
|
|
|
audio_item = self._prepare_audio_for_save(audio_np, normalize) |
|
sf.write(output_path, audio_item, sampling_rate) |
|
saved_paths.append(output_path) |
|
|
|
return saved_paths |
|
|
|
def _prepare_audio_for_save(self, audio: np.ndarray, normalize: bool) -> np.ndarray: |
|
""" |
|
Prepare audio for saving by ensuring it's the right shape and optionally normalizing. |
|
|
|
Args: |
|
audio: Audio data as numpy array |
|
normalize: Whether to normalize audio |
|
|
|
Returns: |
|
np.ndarray: Processed audio ready for saving |
|
""" |
|
|
|
if len(audio.shape) > 1 and audio.shape[0] == 1: |
|
audio = audio.squeeze(0) |
|
|
|
|
|
if normalize: |
|
max_val = np.abs(audio).max() |
|
if max_val > 0: |
|
audio = audio / max_val |
|
|
|
return audio |
|
|
|
|
|
__all__ = ["VibeVoiceTokenizerProcessor", "AudioNormalizer"] |