|
import torch |
|
import numpy as np |
|
import librosa |
|
import requests |
|
import io |
|
import os |
|
import base64 |
|
import matplotlib.pyplot as plt |
|
import soundfile as sf |
|
from scipy.signal import butter, lfilter |
|
from transformers import pipeline, AutoImageProcessor, AutoModelForImageClassification |
|
from PIL import Image |
|
from pydub import AudioSegment |
|
|
|
|
|
TARGET_SR = 2000 |
|
IMAGE_HEIGHT = 128 |
|
|
|
def butter_bandpass_filter(data, fs, lowcut=20.0, highcut=200.0, order=3): |
|
nyq = 0.5 * fs |
|
low = lowcut / nyq |
|
high = highcut / nyq |
|
if high >= 1 or low <= 0: |
|
return data |
|
b, a = butter(order, [low, high], btype='band') |
|
return lfilter(b, a, data) |
|
|
|
def create_spectrogram_image(y_cleaned, sr): |
|
mel_spec = librosa.feature.melspectrogram(y=y_cleaned, sr=sr, n_mels=IMAGE_HEIGHT) |
|
S_DB = librosa.power_to_db(mel_spec, ref=np.max) |
|
img_array = (S_DB - S_DB.min()) / (S_DB.max() - S_DB.min() + 1e-6) * 255.0 |
|
img_array = img_array.astype(np.uint8) |
|
return Image.fromarray(img_array).convert("RGB") |
|
|
|
def calculate_bpm(y_cleaned, sr): |
|
onset_env = librosa.onset.onset_strength(y=y_cleaned, sr=sr, aggregate=np.mean) |
|
bpm = librosa.beat.tempo(onset_envelope=onset_env, sr=sr)[0] |
|
return bpm |
|
|
|
|
|
class EndpointHandler: |
|
def __init__(self, path=""): |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
model_directory = os.path.join(path, "modelo-vit-audio-final") |
|
|
|
processor = AutoImageProcessor.from_pretrained(model_directory) |
|
model = AutoModelForImageClassification.from_pretrained(model_directory).to(device) |
|
|
|
self.pipe = pipeline( |
|
"image-classification", |
|
model=model, |
|
image_processor=processor, |
|
device=device |
|
) |
|
print("Pipeline ViT com pré-processamento de áudio (via URL) carregado com sucesso.") |
|
|
|
def __call__(self, data: dict) -> list: |
|
audio_url = data.pop("inputs", None) |
|
if not audio_url or not isinstance(audio_url, str): |
|
return [{"error": "Nenhum 'inputs' (URL de áudio como string) foi fornecido."}] |
|
|
|
try: |
|
print(f"Baixando e processando áudio de: {audio_url}") |
|
|
|
response = requests.get(audio_url) |
|
response.raise_for_status() |
|
audio_data = io.BytesIO(response.content) |
|
|
|
sound = AudioSegment.from_file(audio_data) |
|
sound = sound.set_channels(1) |
|
|
|
sr_original = sound.frame_rate |
|
y = np.array(sound.get_array_of_samples()).astype(np.float32) |
|
y_normalized = y / (2**15) |
|
|
|
if sr_original != TARGET_SR: |
|
y_resampled = librosa.resample(y=y_normalized, orig_sr=sr_original, target_sr=TARGET_SR) |
|
else: |
|
y_resampled = y_normalized |
|
|
|
y_cleaned = butter_bandpass_filter(y_resampled, fs=TARGET_SR) |
|
spectrogram_image = create_spectrogram_image(y_cleaned, TARGET_SR) |
|
|
|
|
|
buffer = io.BytesIO() |
|
sf.write(buffer, y_cleaned, TARGET_SR, format='WAV') |
|
buffer.seek(0) |
|
|
|
|
|
audio_base64 = base64.b64encode(buffer.read()).decode('utf-8') |
|
|
|
bpm = calculate_bpm(y_cleaned, TARGET_SR) |
|
print(f"BPM Estimado: {bpm:.0f}") |
|
|
|
|
|
print("Gerando e codificando gráfico PCG para a resposta...") |
|
time_axis = np.arange(0, len(y_cleaned)) / TARGET_SR |
|
|
|
start_time, end_time = 1.0, 5.0 |
|
start_index, end_index = int(start_time * TARGET_SR), int(end_time * TARGET_SR) |
|
if end_index > len(y_cleaned): |
|
end_index = len(y_cleaned) |
|
start_index = max(0, end_index - int(4 * TARGET_SR)) |
|
|
|
fig, ax = plt.subplots(figsize=(15, 5)) |
|
ax.plot(time_axis, y_cleaned, linewidth=0.7) |
|
ax.set_title("Fonocardiograma (PCG)") |
|
ax.set_xlabel("Tempo (segundos)") |
|
ax.set_ylabel("Amplitude") |
|
ax.grid(True, linestyle='--') |
|
ax.set_xlim(time_axis[start_index], time_axis[end_index - 1]) |
|
|
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png', bbox_inches='tight') |
|
plt.close(fig) |
|
buf.seek(0) |
|
|
|
|
|
pcg_image_base64 = base64.b64encode(buf.read()).decode('utf-8') |
|
|
|
|
|
print("Enviando espectrograma para o pipeline de predição...") |
|
prediction = self.pipe(spectrogram_image) |
|
print(f"Predição concluída: {prediction}") |
|
|
|
|
|
final_response = { |
|
"classification_results": prediction, |
|
"bpm_estimated": int(round(bpm)), |
|
"pcg_image_base64": f'data:image/png;base64,{pcg_image_base64}', |
|
"audio_base64": f'data:audio/mp3;base64,{audio_base64}' |
|
} |
|
return [final_response] |
|
|
|
except Exception as e: |
|
error_message = f"Erro ao processar a URL do áudio: {str(e)}" |
|
import traceback |
|
traceback.print_exc() |
|
return [{"error": error_message}] |
|
|