aimedlab-pulse-hf / handler.bak.py
ubden's picture
Upload 17 files
1049a4d verified
"""
PULSE-7B Enhanced Handler
Ubden® Team - Edited by https://github.com/ck-cankurt
Support: Text, Image URLs, and Base64 encoded images
"""
import torch
from typing import Dict, List, Any
import base64
from io import BytesIO
from PIL import Image
import requests
import time
# Import utilities if available
try:
from utils import (
performance_monitor,
validate_image_input,
sanitize_parameters,
get_system_info,
create_health_check,
deepseek_client
)
UTILS_AVAILABLE = True
except ImportError:
UTILS_AVAILABLE = False
deepseek_client = None
print("⚠️ Utils module not found - performance monitoring and DeepSeek integration disabled")
# Try to import LLaVA modules for proper conversation handling
try:
from llava.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN
from llava.conversation import conv_templates, SeparatorStyle
from llava.mm_utils import tokenizer_image_token, process_images, KeywordsStoppingCriteria
LLAVA_AVAILABLE = True
print("✅ LLaVA modules imported successfully")
except ImportError:
LLAVA_AVAILABLE = False
print("⚠️ LLaVA modules not available - using basic text processing")
class EndpointHandler:
def __init__(self, path=""):
"""
Hey there! Let's get this PULSE-7B model up and running.
We'll load it from the HuggingFace hub directly, so no worries about local files.
Args:
path: Model directory path (we actually ignore this and load from HF hub)
"""
print("🚀 Starting up PULSE-7B handler...")
print("📝 Enhanced by Ubden® Team - github.com/ck-cankurt")
import sys
print(f"🔧 Python version: {sys.version}")
print(f"🔧 PyTorch version: {torch.__version__}")
# Check transformers version
try:
import transformers
print(f"🔧 Transformers version: {transformers.__version__}")
# PULSE LLaVA works with transformers==4.37.2
if transformers.__version__ == "4.37.2":
print("✅ Using PULSE LLaVA compatible version (4.37.2)")
elif "dev" in transformers.__version__ or "git" in str(transformers.__version__):
print("⚠️ Using development version - may conflict with PULSE LLaVA")
else:
print("⚠️ Using different version - PULSE LLaVA prefers 4.37.2")
except Exception as e:
print(f"❌ Error checking transformers version: {e}")
print(f"🔧 CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"🔧 CUDA device: {torch.cuda.get_device_name(0)}")
# Let's see what hardware we're working with
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"🖥️ Running on: {self.device}")
try:
# First attempt - PULSE demo's exact approach
if LLAVA_AVAILABLE:
print("📦 Using PULSE demo's load_pretrained_model approach...")
from llava.model.builder import load_pretrained_model
from llava.mm_utils import get_model_name_from_path
model_path = "PULSE-ECG/PULSE-7B"
model_name = get_model_name_from_path(model_path)
self.tokenizer, self.model, self.image_processor, self.context_len = load_pretrained_model(
model_path=model_path,
model_base=None,
model_name=model_name,
load_8bit=False,
load_4bit=False
)
# Move model to device like demo
self.model = self.model.to(self.device)
self.use_pipeline = False
print("✅ Model loaded successfully with PULSE demo's approach!")
print(f"📸 Image processor: {type(self.image_processor).__name__}")
else:
raise ImportError("LLaVA modules not available")
except Exception as e:
print(f"⚠️ PULSE demo approach failed: {e}")
print("🔄 Falling back to pipeline...")
try:
# Fallback - using pipeline
from transformers import pipeline
print("📦 Fetching model from HuggingFace Hub...")
self.pipe = pipeline(
"text-generation",
model="PULSE-ECG/PULSE-7B",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device=0 if torch.cuda.is_available() else -1,
trust_remote_code=True,
model_kwargs={
"low_cpu_mem_usage": True,
"use_safetensors": True
}
)
self.use_pipeline = True
self.image_processor = None
print("✅ Model loaded successfully via pipeline!")
except Exception as e2:
print(f"😓 Pipeline also failed: {e2}")
try:
# Last resort - manual loading
from transformers import AutoTokenizer, LlamaForCausalLM
print("📖 Manual loading as last resort...")
self.tokenizer = AutoTokenizer.from_pretrained(
"PULSE-ECG/PULSE-7B",
trust_remote_code=True
)
self.model = LlamaForCausalLM.from_pretrained(
"PULSE-ECG/PULSE-7B",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto",
low_cpu_mem_usage=True,
trust_remote_code=True
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
self.model.eval()
self.use_pipeline = False
self.image_processor = None
print("✅ Model loaded manually!")
except Exception as e3:
print(f"😓 All approaches failed: {e3}")
self.pipe = None
self.model = None
self.tokenizer = None
self.image_processor = None
self.use_pipeline = None
# Final status report
print("\n🔍 Model Loading Status Report:")
print(f" - use_pipeline: {self.use_pipeline}")
print(f" - model: {'✅ Loaded' if hasattr(self, 'model') and self.model is not None else '❌ None'}")
print(f" - tokenizer: {'✅ Loaded' if hasattr(self, 'tokenizer') and self.tokenizer is not None else '❌ None'}")
print(f" - image_processor: {'✅ Loaded' if hasattr(self, 'image_processor') and self.image_processor is not None else '❌ None'}")
print(f" - pipe: {'✅ Loaded' if hasattr(self, 'pipe') and self.pipe is not None else '❌ None'}")
# Check if any model component loaded successfully
has_model = hasattr(self, 'model') and self.model is not None
has_tokenizer = hasattr(self, 'tokenizer') and self.tokenizer is not None
has_pipe = hasattr(self, 'pipe') and self.pipe is not None
has_image_processor = hasattr(self, 'image_processor') and self.image_processor is not None
if not (has_model or has_tokenizer or has_pipe):
print("💥 CRITICAL: No model components loaded successfully!")
else:
print("✅ At least one model component loaded successfully")
if has_image_processor:
print("🖼️ Vision capabilities available!")
else:
print("⚠️ No image processor - text-only mode")
def is_valid_image_format(self, filename_or_url):
"""Validate image format like PULSE demo"""
# Demo's supported formats
image_extensions = ["jpg", "jpeg", "png", "bmp", "gif", "tiff", "webp", "heic", "heif", "jfif", "svg", "eps", "raw"]
if filename_or_url.startswith(('http://', 'https://')):
# For URLs, check the extension or content-type
ext = filename_or_url.split('.')[-1].split('?')[0].lower()
return ext in image_extensions
else:
# For base64 or local files
return True # Base64 will be validated during decode
def process_image_input(self, image_input):
"""
Handle both URL and base64 image inputs exactly like PULSE demo
Args:
image_input: Can be a URL string or base64 encoded image
Returns:
PIL Image object or None if something goes wrong
"""
try:
# Check if it's a URL (starts with http/https)
if isinstance(image_input, str) and (image_input.startswith('http://') or image_input.startswith('https://')):
print(f"🌐 Fetching image from URL: {image_input[:50]}...")
# Validate format like demo
if not self.is_valid_image_format(image_input):
print("❌ Invalid image format in URL")
return None
# Demo's exact image loading approach
response = requests.get(image_input, timeout=15)
if response.status_code == 200:
image = Image.open(BytesIO(response.content)).convert("RGB")
print(f"✅ Image downloaded successfully! Size: {image.size}")
return image
else:
print(f"❌ Failed to load image: status {response.status_code}")
return None
# Must be base64 then
elif isinstance(image_input, str):
print("🔍 Decoding base64 image...")
# Remove the data URL prefix if it exists
base64_data = image_input
if "base64," in image_input:
base64_data = image_input.split("base64,")[1]
# Clean and validate base64 data
base64_data = base64_data.strip().replace('\n', '').replace('\r', '').replace(' ', '')
try:
image_data = base64.b64decode(base64_data)
image = Image.open(BytesIO(image_data)).convert('RGB')
print(f"✅ Base64 image decoded successfully! Size: {image.size}")
return image
except Exception as decode_error:
print(f"❌ Base64 decode error: {decode_error}")
return None
except Exception as e:
print(f"❌ Couldn't process the image: {e}")
return None
return None
def add_turkish_commentary(self, response: Dict[str, Any], enable_commentary: bool, timeout: int = 30) -> Dict[str, Any]:
"""Add Turkish commentary to the response using DeepSeek API"""
if not enable_commentary:
return response
if not UTILS_AVAILABLE or not deepseek_client:
print("⚠️ DeepSeek client not available - skipping Turkish commentary")
response["commentary_status"] = "unavailable"
return response
if not deepseek_client.is_available():
print("⚠️ DeepSeek API key not configured - skipping Turkish commentary")
response["commentary_status"] = "api_key_missing"
return response
generated_text = response.get("generated_text", "")
if not generated_text:
print("⚠️ No generated text to comment on")
response["commentary_status"] = "no_text"
return response
print("🔄 DeepSeek ile Türkçe yorum ekleniyor...")
commentary_result = deepseek_client.get_turkish_commentary(generated_text, timeout)
if commentary_result["success"]:
response["comment_text"] = commentary_result["comment_text"]
response["commentary_model"] = commentary_result.get("model", "deepseek-chat")
response["commentary_tokens"] = commentary_result.get("tokens_used", 0)
response["commentary_status"] = "success"
print("✅ Türkçe yorum başarıyla eklendi")
else:
response["comment_text"] = ""
response["commentary_error"] = commentary_result["error"]
response["commentary_status"] = "failed"
print(f"❌ Türkçe yorum eklenemedi: {commentary_result['error']}")
return response
def health_check(self) -> Dict[str, Any]:
"""Health check endpoint"""
if UTILS_AVAILABLE:
return create_health_check()
else:
return {
'status': 'healthy',
'model': 'PULSE-7B',
'timestamp': time.time(),
'handler_version': '2.0.0'
}
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Main processing function - where the magic happens!
Args:
data: Input data with 'inputs' and optional 'parameters'
Returns:
List with the generated response
"""
# Quick check - is our model ready?
if self.use_pipeline is None:
return [{
"generated_text": "Oops! Model couldn't load properly. Please check the deployment settings.",
"error": "Model initialization failed",
"handler": "Ubden® Team Enhanced Handler"
}]
try:
# Parse the inputs - flexible format support
inputs = data.get("inputs", "")
text = ""
image = None
if isinstance(inputs, dict):
# Dictionary input - check for text and image
# Support query field (new) plus original text/prompt fields
text = inputs.get("query", inputs.get("text", inputs.get("prompt", str(inputs))))
# Check for image in various formats
image_input = inputs.get("image", inputs.get("image_url", inputs.get("image_base64", None)))
if image_input:
image = self.process_image_input(image_input)
if image:
# Since we're in text-only mode, create smart ECG context
print(f"🖼️ Image loaded: {image.size[0]}x{image.size[1]} pixels - using text-only ECG analysis mode")
# Create ECG-specific prompt that mimics visual analysis
ecg_context = f"Analyzing an ECG image ({image.size[0]}x{image.size[1]} pixels). "
# Use demo's exact approach - no additional context, just the query
# Model is trained to understand ECG images from text queries
pass # Keep text exactly as received
else:
# Simple string input
text = str(inputs)
if not text:
return [{"generated_text": "Hey, I need some text to work with! Please provide an input."}]
# Get generation parameters - using PULSE-7B demo's exact settings
parameters = data.get("parameters", {})
max_new_tokens = min(parameters.get("max_new_tokens", 1024), 8192) # Demo uses 1024 default
temperature = parameters.get("temperature", 0.05) # Demo uses 0.05 for precise medical analysis
top_p = parameters.get("top_p", 1.0) # Demo uses 1.0 for full vocabulary access
do_sample = parameters.get("do_sample", True) # Demo uses sampling
repetition_penalty = parameters.get("repetition_penalty", 1.0) # Demo default
print(f"🎛️ Generation params: max_tokens={max_new_tokens}, temp={temperature}, top_p={top_p}, do_sample={do_sample}, rep_penalty={repetition_penalty}")
# Check if Turkish commentary is requested (NEW FEATURE)
enable_turkish_commentary = parameters.get("enable_turkish_commentary", False) # Default false
# Using pipeline? Let's go!
if self.use_pipeline:
print(f"🎛️ Pipeline generation: temp={temperature}, tokens={max_new_tokens}")
print(f"📝 Input text: '{text[:100]}...'")
result = self.pipe(
text,
max_new_tokens=max_new_tokens,
min_new_tokens=200, # Force very detailed analysis to match demo
temperature=temperature,
top_p=top_p,
do_sample=do_sample,
repetition_penalty=repetition_penalty,
return_full_text=False # Just the new stuff, not the input
)
# Pipeline returns a list, let's handle it
if isinstance(result, list) and len(result) > 0:
generated_text = result[0].get("generated_text", "").strip()
print(f"🔍 Pipeline debug:")
print(f" - Raw result: '{str(result[0])[:200]}...'")
print(f" - Generated text length: {len(generated_text)}")
# Clean up common issues
if generated_text.startswith(text):
generated_text = generated_text[len(text):].strip()
print("🔧 Removed input text from output")
# Remove common artifacts
generated_text = generated_text.replace("</s>", "").strip()
if not generated_text:
print("❌ Pipeline generated empty text!")
generated_text = "Empty response from pipeline. Please try different parameters."
print(f"✅ Final pipeline text: '{generated_text[:100]}...' (length: {len(generated_text)})")
# Create response
response = {"generated_text": generated_text}
# Add Turkish commentary if requested (NEW FEATURE)
if enable_turkish_commentary:
response = self.add_turkish_commentary(response, True)
return [response]
else:
generated_text = str(result).strip()
# Create response
response = {"generated_text": generated_text}
# Add Turkish commentary if requested (NEW FEATURE)
if enable_turkish_commentary:
response = self.add_turkish_commentary(response, True)
return [response]
# Manual generation mode - using PULSE demo's exact approach
else:
print(f"🔥 Manual generation with PULSE demo logic: temp={temperature}, tokens={max_new_tokens}")
print(f"📝 Input text: '{text[:100]}...'")
# Text-only generation with enhanced ECG context
print("🔤 Using enhanced text-only generation with ECG context")
# Tokenize the enhanced prompt
encoded = self.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=4096 # Increased for longer prompts
)
input_ids = encoded["input_ids"].to(self.device)
attention_mask = encoded.get("attention_mask")
if attention_mask is not None:
attention_mask = attention_mask.to(self.device)
print(f"🔍 Enhanced generation debug:")
print(f" - Enhanced prompt length: {len(text)} chars")
print(f" - Input tokens: {input_ids.shape[-1]}")
print(f" - Prompt preview: '{text[:150]}...'")
# Generate with enhanced settings for medical analysis
with torch.no_grad():
outputs = self.model.generate(
input_ids,
attention_mask=attention_mask,
max_new_tokens=max_new_tokens,
min_new_tokens=200, # Force detailed response like demo
temperature=temperature,
top_p=top_p,
do_sample=do_sample,
repetition_penalty=repetition_penalty,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id,
early_stopping=False
)
# Decode and clean response
generated_ids = outputs[0][input_ids.shape[-1]:]
generated_text = self.tokenizer.decode(
generated_ids,
skip_special_tokens=True,
clean_up_tokenization_spaces=True
).strip()
# Aggressive cleanup of artifacts
generated_text = generated_text.replace("</s>", "").strip()
# Simple cleanup - just remove Answer prefix and parentheses
if generated_text.startswith("(Answer:") and ")" in generated_text:
# Just remove the parentheses and Answer: prefix
end_paren = generated_text.find(")")
answer_content = generated_text[8:end_paren].strip() # Remove "(Answer:"
# Keep the rest of the response if there is any
rest_of_response = generated_text[end_paren+1:].strip()
if rest_of_response:
generated_text = f"{answer_content}. {rest_of_response}"
else:
generated_text = answer_content
elif generated_text.startswith("Answer:"):
generated_text = generated_text[7:].strip()
# Remove only clear training artifacts
cleanup_patterns = [
"In this task",
"I'm asking the respondent",
"The respondent should"
]
for pattern in cleanup_patterns:
if pattern in generated_text:
parts = generated_text.split(pattern)
generated_text = parts[0].strip()
break
# Only provide fallback if response is truly empty or malformed
if len(generated_text) < 10 or generated_text.startswith("7)"):
print("⚠️ Malformed response detected, providing fallback...")
generated_text = "This ECG shows cardiac electrical activity. For accurate interpretation, please consult with a qualified cardiologist who can analyze the specific waveforms, intervals, and morphology patterns."
print(f"✅ Enhanced text-only generation: '{generated_text[:100]}...' (length: {len(generated_text)})")
# Create response
response = {"generated_text": generated_text}
# Add Turkish commentary if requested (NEW FEATURE)
if enable_turkish_commentary:
response = self.add_turkish_commentary(response, True)
return [response]
except Exception as e:
error_msg = f"Something went wrong during generation: {str(e)}"
print(f"❌ {error_msg}")
return [{
"generated_text": "",
"error": error_msg,
"handler": "Ubden® Team Enhanced Handler"
}]