from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor, Qwen2_5_VLForConditionalGeneration from qwen_vl_utils import process_vision_info from transformers import TextIteratorStreamer from threading import Thread import logging import torch import time import pynvml class Qwen2VL: def __init__(self, model_id): self.model_id = model_id if "2.5" in model_id: self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_id, torch_dtype="float16", device_map="auto" ) else: self.model = Qwen2VLForConditionalGeneration.from_pretrained( model_id, torch_dtype="float16", device_map="auto" ) self.processor = AutoProcessor.from_pretrained(model_id) self.handle = None if torch.cuda.is_available(): try: pynvml.nvmlInit() device_id = next(self.model.parameters()).device.index self.handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) except Exception as e: logging.error(f"Failed to initialize NVML: {e}") def __del__(self): if hasattr(self, 'handle') and self.handle: try: pynvml.nvmlShutdown() except: pass def generate(self, video, prompt): start_time = time.time() # Preparation for inference video_paths = [f"file://{path}" for path in video] messages = [ { "role": "user", "content": [ { "type": "video", "video": video_paths, "resized_height": 280, "resized_width": 420, }, {"type": "text", "text": prompt}, ], } ] text = self.processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = self.processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) inputs = inputs.to("cuda") logging.info(f"Prompt token length: {len(inputs.input_ids[0])}") streamer = TextIteratorStreamer(self.processor, skip_prompt=True, skip_special_tokens=True) generation_kwargs = dict( **inputs, streamer=streamer, max_new_tokens=256 ) thread = Thread(target=self.model.generate, kwargs=generation_kwargs) thread.start() full_response = "" print("Response: ", end="") first_token_time = None for new_text in streamer: if first_token_time is None: first_token_time = time.time() full_response += new_text print(new_text, end="", flush=True) print() thread.join() end_time = time.time() if first_token_time is not None: generation_time = end_time - first_token_time else: generation_time = 0 num_generated_tokens = len(self.processor.tokenizer(full_response).input_ids) tokens_per_second = num_generated_tokens / generation_time if generation_time > 0 else 0 peak_memory_mb = 0 if self.handle: mem_info = pynvml.nvmlDeviceGetMemoryInfo(self.handle) peak_memory_mb = mem_info.used / (1024 * 1024) return { "response": full_response, "tokens_per_second": tokens_per_second, "peak_gpu_memory_mb": peak_memory_mb, "num_generated_tokens": num_generated_tokens, }