|
from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor, Qwen2_5_VLForConditionalGeneration |
|
from qwen_vl_utils import process_vision_info |
|
from transformers import TextIteratorStreamer |
|
from threading import Thread |
|
import logging |
|
import torch |
|
import time |
|
import pynvml |
|
|
|
class Qwen2VL: |
|
def __init__(self, model_id): |
|
self.model_id = model_id |
|
if "2.5" in model_id: |
|
self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained( |
|
model_id, torch_dtype="float16", device_map="auto" |
|
) |
|
else: |
|
self.model = Qwen2VLForConditionalGeneration.from_pretrained( |
|
model_id, torch_dtype="float16", device_map="auto" |
|
) |
|
self.processor = AutoProcessor.from_pretrained(model_id) |
|
|
|
self.handle = None |
|
if torch.cuda.is_available(): |
|
try: |
|
pynvml.nvmlInit() |
|
device_id = next(self.model.parameters()).device.index |
|
self.handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) |
|
except Exception as e: |
|
logging.error(f"Failed to initialize NVML: {e}") |
|
|
|
def __del__(self): |
|
if hasattr(self, 'handle') and self.handle: |
|
try: |
|
pynvml.nvmlShutdown() |
|
except: |
|
pass |
|
|
|
def generate(self, video, prompt): |
|
|
|
start_time = time.time() |
|
|
|
|
|
video_paths = [f"file://{path}" for path in video] |
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "video", |
|
"video": video_paths, |
|
"resized_height": 280, |
|
"resized_width": 420, |
|
}, |
|
{"type": "text", "text": prompt}, |
|
], |
|
} |
|
] |
|
text = self.processor.apply_chat_template( |
|
messages, tokenize=False, add_generation_prompt=True |
|
) |
|
|
|
image_inputs, video_inputs = process_vision_info(messages) |
|
inputs = self.processor( |
|
text=[text], |
|
images=image_inputs, |
|
videos=video_inputs, |
|
padding=True, |
|
return_tensors="pt", |
|
) |
|
inputs = inputs.to("cuda") |
|
logging.info(f"Prompt token length: {len(inputs.input_ids[0])}") |
|
streamer = TextIteratorStreamer(self.processor, skip_prompt=True, skip_special_tokens=True) |
|
|
|
generation_kwargs = dict( |
|
**inputs, |
|
streamer=streamer, |
|
max_new_tokens=256 |
|
) |
|
|
|
thread = Thread(target=self.model.generate, kwargs=generation_kwargs) |
|
thread.start() |
|
full_response = "" |
|
print("Response: ", end="") |
|
first_token_time = None |
|
for new_text in streamer: |
|
if first_token_time is None: |
|
first_token_time = time.time() |
|
full_response += new_text |
|
print(new_text, end="", flush=True) |
|
print() |
|
thread.join() |
|
|
|
end_time = time.time() |
|
|
|
if first_token_time is not None: |
|
generation_time = end_time - first_token_time |
|
else: |
|
generation_time = 0 |
|
|
|
num_generated_tokens = len(self.processor.tokenizer(full_response).input_ids) |
|
tokens_per_second = num_generated_tokens / generation_time if generation_time > 0 else 0 |
|
|
|
peak_memory_mb = 0 |
|
if self.handle: |
|
mem_info = pynvml.nvmlDeviceGetMemoryInfo(self.handle) |
|
peak_memory_mb = mem_info.used / (1024 * 1024) |
|
|
|
return { |
|
"response": full_response, |
|
"tokens_per_second": tokens_per_second, |
|
"peak_gpu_memory_mb": peak_memory_mb, |
|
"num_generated_tokens": num_generated_tokens, |
|
} |
|
|
|
|