|
import os |
|
import torch |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
import requests |
|
from io import BytesIO |
|
from transformers.pipelines.audio_utils import ffmpeg_read |
|
import mutagen |
|
from torchaudio import functional as taF |
|
import numpy as np |
|
|
|
feature_extractor_sampling_rate = 16000 |
|
clip_length = 30*feature_extractor_sampling_rate |
|
clip_drop = feature_extractor_sampling_rate//2 |
|
AUDIO_EXTENSIONS = ('.wav', '.mp3', '.flac', '.opus', '.ogg') |
|
|
|
|
|
def load_audio_single(audio_file, seg=None): |
|
assert isinstance(audio_file, str), "audio_file should be a string" |
|
if audio_file.endswith(AUDIO_EXTENSIONS): |
|
inputs=audio_file |
|
in_sampling_rate=mutagen.File(inputs).info.sample_rate |
|
if inputs.startswith("http://") or inputs.startswith("https://"): |
|
|
|
inputs = requests.get(inputs).content |
|
else: |
|
with open(inputs, "rb") as f: |
|
inputs = f.read() |
|
if isinstance(inputs, bytes): |
|
inputs = ffmpeg_read(inputs, in_sampling_rate) |
|
if seg is not None: |
|
inputs = inputs[int(seg[0] * in_sampling_rate):int(seg[1] * in_sampling_rate)] |
|
if in_sampling_rate != feature_extractor_sampling_rate: |
|
inputs = taF.resample( |
|
torch.from_numpy(inputs.copy()), in_sampling_rate, feature_extractor_sampling_rate |
|
).numpy() |
|
if len(inputs) <= clip_length: |
|
return [inputs] |
|
else: |
|
audios = [] |
|
for i in range(0, len(inputs), clip_length): |
|
chunk = inputs[i : i + clip_length] |
|
chunk_index = len(chunk) |
|
if chunk_index > clip_drop: |
|
audios.append(chunk) |
|
return audios |
|
if audio_file.endswith('.npy'): |
|
return [np.load(audio_file)] |
|
|
|
|
|
def load_audios(audio_preprocess, audio_files, segs=None, audio_folder=None): |
|
if audio_files is None: |
|
return None, None |
|
if isinstance(audio_files, str): |
|
audio_files = [audio_files] |
|
if segs: |
|
if segs and isinstance(segs[0], float): |
|
segs = [segs] |
|
else: |
|
segs = [None for _ in range(len(audio_files))] |
|
if audio_folder: |
|
audio_files = [os.path.join(audio_folder, afile) for afile in audio_files] |
|
|
|
def get_single_audio(audio_file, seg): |
|
try: |
|
if seg: |
|
audio = load_audio_single(audio_file, seg) |
|
else: |
|
audio = load_audio_single(audio_file) |
|
|
|
audio = [audio_preprocess(aud) for aud in audio] |
|
|
|
except Exception as e: |
|
print(f"Error loading {audio_file} seg {seg}: {e}") |
|
audio = None |
|
|
|
return audio |
|
|
|
audio_size= [] |
|
audio_list = [] |
|
for ii in range(len(audio_files)): |
|
audio_file = audio_files[ii] |
|
seg = segs[ii] |
|
single_audio_list = get_single_audio(audio_file,seg) |
|
audio_size.append(len(single_audio_list)) |
|
audio_list.extend(single_audio_list) |
|
|
|
return audio_list, audio_size |
|
|
|
class AudioPreprocess: |
|
def __init__(self, image_processor, data_args={}): |
|
self.image_aspect_ratio = getattr(data_args, 'image_aspect_ratio', None) |
|
self.image_processor = image_processor |
|
|
|
|
|
def __call__(self, image): |
|
assert self.image_aspect_ratio == "audio", "image_aspect_ratio should be 'audio' for audio preprocessing" |
|
return self.image_processor(image, sampling_rate=feature_extractor_sampling_rate, return_tensors="pt").input_features |
|
|