MuFun-Instruct / audio_preprocess.py
wanghanrui
Add initial model and configuration files
c3bf9f0
raw
history blame
3.74 kB
import os
import torch
import warnings
warnings.filterwarnings('ignore')
import requests
from io import BytesIO
from transformers.pipelines.audio_utils import ffmpeg_read
import mutagen
from torchaudio import functional as taF
import numpy as np
feature_extractor_sampling_rate = 16000
clip_length = 30*feature_extractor_sampling_rate
clip_drop = feature_extractor_sampling_rate//2
AUDIO_EXTENSIONS = ('.wav', '.mp3', '.flac', '.opus', '.ogg')
def load_audio_single(audio_file, seg=None):
assert isinstance(audio_file, str), "audio_file should be a string"
if audio_file.endswith(AUDIO_EXTENSIONS):
inputs=audio_file
in_sampling_rate=mutagen.File(inputs).info.sample_rate
if inputs.startswith("http://") or inputs.startswith("https://"):
# We need to actually check for a real protocol, otherwise it's impossible to use a local file
inputs = requests.get(inputs).content
else:
with open(inputs, "rb") as f:
inputs = f.read()
if isinstance(inputs, bytes):
inputs = ffmpeg_read(inputs, in_sampling_rate)
if seg is not None:
inputs = inputs[int(seg[0] * in_sampling_rate):int(seg[1] * in_sampling_rate)]
if in_sampling_rate != feature_extractor_sampling_rate:
inputs = taF.resample(
torch.from_numpy(inputs.copy()), in_sampling_rate, feature_extractor_sampling_rate
).numpy()
if len(inputs) <= clip_length:
return [inputs]
else:
audios = []
for i in range(0, len(inputs), clip_length):
chunk = inputs[i : i + clip_length]
chunk_index = len(chunk)
if chunk_index > clip_drop:
audios.append(chunk)
return audios
if audio_file.endswith('.npy'):
return [np.load(audio_file)]
def load_audios(audio_preprocess, audio_files, segs=None, audio_folder=None):
if audio_files is None:
return None, None
if isinstance(audio_files, str):
audio_files = [audio_files]
if segs:
if segs and isinstance(segs[0], float):
segs = [segs]
else:
segs = [None for _ in range(len(audio_files))]
if audio_folder:
audio_files = [os.path.join(audio_folder, afile) for afile in audio_files]
def get_single_audio(audio_file, seg):
try:
if seg:
audio = load_audio_single(audio_file, seg)
else:
audio = load_audio_single(audio_file)
audio = [audio_preprocess(aud) for aud in audio]
except Exception as e:
print(f"Error loading {audio_file} seg {seg}: {e}")
audio = None
return audio
audio_size= []
audio_list = []
for ii in range(len(audio_files)):
audio_file = audio_files[ii]
seg = segs[ii]
single_audio_list = get_single_audio(audio_file,seg)
audio_size.append(len(single_audio_list))
audio_list.extend(single_audio_list)
return audio_list, audio_size
class AudioPreprocess:
def __init__(self, image_processor, data_args={}):
self.image_aspect_ratio = getattr(data_args, 'image_aspect_ratio', None)
self.image_processor = image_processor
# self.image_grid_pinpoints = getattr(data_args, 'image_grid_pinpoints', None)
def __call__(self, image):
assert self.image_aspect_ratio == "audio", "image_aspect_ratio should be 'audio' for audio preprocessing"
return self.image_processor(image, sampling_rate=feature_extractor_sampling_rate, return_tensors="pt").input_features