import os import torch import warnings warnings.filterwarnings('ignore') import requests from io import BytesIO from transformers.pipelines.audio_utils import ffmpeg_read import mutagen from torchaudio import functional as taF import numpy as np feature_extractor_sampling_rate = 16000 clip_length = 30*feature_extractor_sampling_rate clip_drop = feature_extractor_sampling_rate//2 AUDIO_EXTENSIONS = ('.wav', '.mp3', '.flac', '.opus', '.ogg') def load_audio_single(audio_file, seg=None): assert isinstance(audio_file, str), "audio_file should be a string" if audio_file.endswith(AUDIO_EXTENSIONS): inputs=audio_file in_sampling_rate=mutagen.File(inputs).info.sample_rate if inputs.startswith("http://") or inputs.startswith("https://"): # We need to actually check for a real protocol, otherwise it's impossible to use a local file inputs = requests.get(inputs).content else: with open(inputs, "rb") as f: inputs = f.read() if isinstance(inputs, bytes): inputs = ffmpeg_read(inputs, in_sampling_rate) if seg is not None: inputs = inputs[int(seg[0] * in_sampling_rate):int(seg[1] * in_sampling_rate)] if in_sampling_rate != feature_extractor_sampling_rate: inputs = taF.resample( torch.from_numpy(inputs.copy()), in_sampling_rate, feature_extractor_sampling_rate ).numpy() if len(inputs) <= clip_length: return [inputs] else: audios = [] for i in range(0, len(inputs), clip_length): chunk = inputs[i : i + clip_length] chunk_index = len(chunk) if chunk_index > clip_drop: audios.append(chunk) return audios if audio_file.endswith('.npy'): return [np.load(audio_file)] def load_audios(audio_preprocess, audio_files, segs=None, audio_folder=None): if audio_files is None: return None, None if isinstance(audio_files, str): audio_files = [audio_files] if segs: if segs and isinstance(segs[0], float): segs = [segs] else: segs = [None for _ in range(len(audio_files))] if audio_folder: audio_files = [os.path.join(audio_folder, afile) for afile in audio_files] def get_single_audio(audio_file, seg): try: if seg: audio = load_audio_single(audio_file, seg) else: audio = load_audio_single(audio_file) audio = [audio_preprocess(aud) for aud in audio] except Exception as e: print(f"Error loading {audio_file} seg {seg}: {e}") audio = None return audio audio_size= [] audio_list = [] for ii in range(len(audio_files)): audio_file = audio_files[ii] seg = segs[ii] single_audio_list = get_single_audio(audio_file,seg) audio_size.append(len(single_audio_list)) audio_list.extend(single_audio_list) return audio_list, audio_size class AudioPreprocess: def __init__(self, image_processor, data_args={}): self.image_aspect_ratio = getattr(data_args, 'image_aspect_ratio', None) self.image_processor = image_processor # self.image_grid_pinpoints = getattr(data_args, 'image_grid_pinpoints', None) def __call__(self, image): assert self.image_aspect_ratio == "audio", "image_aspect_ratio should be 'audio' for audio preprocessing" return self.image_processor(image, sampling_rate=feature_extractor_sampling_rate, return_tensors="pt").input_features