|
import json, os |
|
from pathlib import Path |
|
from typing import List |
|
from datasets import load_dataset |
|
from PIL import Image |
|
from tqdm import tqdm |
|
import concurrent.futures as cf |
|
import os |
|
from openai import AzureOpenAI |
|
from typing import Set, List, Dict, Any |
|
import time |
|
import pandas as pd |
|
from tqdm import tqdm |
|
import io |
|
import base64 |
|
import imghdr |
|
from io import BytesIO |
|
from mimetypes import guess_type |
|
import base64 |
|
import time |
|
from datasets import load_dataset, Features, Sequence, Value, Image as HFImage, ClassLabel |
|
from PIL import Image |
|
|
|
import concurrent.futures as cf |
|
import os |
|
from typing import List |
|
import os |
|
from io import BytesIO |
|
|
|
import vertexai |
|
from vertexai import generative_models |
|
from vertexai.generative_models import GenerativeModel, Part |
|
from datasets import load_dataset |
|
from PIL import Image as PILImage |
|
|
|
|
|
|
|
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './gemini_key.json' |
|
|
|
|
|
generation_config = { |
|
"max_output_tokens": 2048, |
|
"temperature": 0.4, |
|
"top_p": 0.4, |
|
"top_k": 32, |
|
} |
|
safety_settings = { |
|
generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: |
|
generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, |
|
generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: |
|
generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, |
|
generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: |
|
generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, |
|
generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: |
|
generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, |
|
} |
|
|
|
|
|
TIMEOUT_CODES = {408, 504, 524} |
|
|
|
|
|
DATASETS = [ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"zli12321/MLLM_test" |
|
|
|
|
|
|
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
N_GEN = 1 |
|
retry_delay = 10 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
QUESTION_TEMPLATE = ( |
|
"You are tasked with analyzing an image and answer a question. First engage in an internal dialogue and include self-reflection or verification in your reasoning process. Provide your detailed, step-by-step reasoning based on the image description information and image, and enclose this part within <think> </think> tags.\n Finally, provide a single word or phrase answer to the question in \\boxed{}.\nThe output format should be: <think> reasoning process here </think> \\boxed{FINAL ANSWER here}." |
|
"Question: {Question}\n" |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_timeout(err): |
|
"""Return True if the error (or its cause) is a network timeout.""" |
|
return isinstance(err, TimeoutError) or isinstance( |
|
getattr(err, "__cause__", None), TimeoutError |
|
) |
|
|
|
|
|
vertexai.init(project="tencent-gemini-omd01", location="us-central1") |
|
|
|
'''Below is for rlvr''' |
|
|
|
|
|
'''Below is for counting''' |
|
|
|
|
|
|
|
'''Below is for Gemini pro pro Accuracy''' |
|
model = GenerativeModel("gemini-2.5-pro") |
|
|
|
|
|
|
|
def generate(pil_img, query): |
|
|
|
buf = BytesIO() |
|
pil_img.convert("RGB").save(buf, format="PNG") |
|
png_bytes = buf.getvalue() |
|
|
|
|
|
image_part = Part.from_data( |
|
data=png_bytes, |
|
mime_type="image/png" |
|
) |
|
for i in range(2): |
|
|
|
try: |
|
responses = model.generate_content( |
|
contents=[image_part, query], |
|
generation_config=generation_config, |
|
safety_settings=safety_settings, |
|
stream=True, |
|
) |
|
|
|
|
|
full = "" |
|
for chunk in responses: |
|
full += chunk.text |
|
except Exception as e: |
|
full = "No Text" |
|
print(f'Failed generating: {e}') |
|
time.sleep(5) |
|
|
|
|
|
return full |
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_answer(image, messages) -> str: |
|
""" |
|
Replace the body of this function with whatever you use to talk to |
|
your model (e.g. OpenAI, Ollama, local HF pipeline, etc.). |
|
Must return a *single* string completion. |
|
""" |
|
|
|
|
|
|
|
|
|
return generate(image, messages) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_prompt(item) -> str: |
|
"""Fill QUESTION_TEMPLATE with the current question.""" |
|
return QUESTION_TEMPLATE.replace("{Question}", item["problem"]) |
|
|
|
def to_rgb(img: Image.Image) -> Image.Image: |
|
return img if img.mode == "RGB" else img.convert("RGB") |
|
|
|
def _load_partial(out_path: Path) -> List[Dict[str, Any]]: |
|
if not out_path.exists(): |
|
return [] |
|
try: |
|
with out_path.open("r", encoding="utf-8") as f: |
|
return json.load(f) |
|
except Exception as err: |
|
print(f"[warn] {out_path} could not be read ({err}) – ignoring.") |
|
return [] |
|
|
|
|
|
def run_dataset(dataset_id: str, n_gen: int = 1) -> None: |
|
"""Run the generation loop for one dataset, resuming if output exists.""" |
|
print(f"\n=== Processing {dataset_id} ===") |
|
|
|
|
|
slug = dataset_id.split("/")[-1] |
|
|
|
|
|
|
|
DATA_OUT = Path(f"./gemini-cot/{slug}.json") |
|
|
|
|
|
DATA_OUT.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
results: List[Dict[str, Any]] = _load_partial(DATA_OUT) |
|
done_idx: Set[int] = {rec["index"] for rec in results} |
|
print(f"[{slug}] found {len(done_idx)} previously processed items") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'count' in dataset_id or 'hotpot' in dataset_id: |
|
ds = load_dataset(dataset_id, split="train") |
|
else: |
|
try: |
|
ds = load_dataset(dataset_id, split="train") |
|
except: |
|
ds = load_dataset(dataset_id, split="test", trust_remote_code=True) |
|
|
|
|
|
|
|
df = ds.to_pandas() |
|
try: |
|
df["pil_images"] = df["images"].apply( |
|
lambda lst: [Image.open(io.BytesIO(d["bytes"])).convert("RGB") for d in lst] |
|
) |
|
images = [imgs[0] for imgs in df["pil_images"]] |
|
except Exception: |
|
df["pil_images"] = df["images"].apply( |
|
lambda d: Image.open(io.BytesIO(d["bytes"])).convert("RGB") |
|
) |
|
images = list(df["pil_images"]) |
|
|
|
|
|
with cf.ThreadPoolExecutor(max_workers=n_gen) as pool: |
|
for idx, item in enumerate( |
|
tqdm(ds, desc=f"generating · {slug}", |
|
initial=len(done_idx), total=len(ds)) |
|
): |
|
if idx in done_idx: |
|
continue |
|
|
|
prompt_txt = build_prompt(item) |
|
|
|
|
|
|
|
|
|
futures = [pool.submit(generate_answer, images[idx], prompt_txt) |
|
for _ in range(n_gen)] |
|
answers = [f.result() for f in futures if f.result()] |
|
|
|
if answers: |
|
results.append( |
|
dict( |
|
index = idx, |
|
problem = item["problem"], |
|
solution = item["answer"], |
|
predictions = answers, |
|
) |
|
) |
|
DATA_OUT.write_text(json.dumps(results, indent=2, ensure_ascii=False)) |
|
print(f"✅ {slug}: finished {len(results)} samples → {DATA_OUT}") |
|
|
|
|
|
|
|
def run_all( |
|
datasets: list, |
|
default_n_gen: int = 1, |
|
max_workers: int | None = None, |
|
) -> None: |
|
""" |
|
Launch `run_dataset` for every entry in *datasets*. |
|
|
|
`datasets` may contain: |
|
• "foo/bar" -> uses default_n_gen |
|
• ("foo/bar", 8) -> uses 8 for that file |
|
""" |
|
if max_workers is None: |
|
max_workers = min(len(datasets), 32) |
|
|
|
print(f"\nLaunching {len(datasets)} dataset jobs " |
|
f"({max_workers} workers)…\n") |
|
|
|
with cf.ThreadPoolExecutor(max_workers=max_workers) as pool: |
|
fut_to_name = {} |
|
for entry in datasets: |
|
if isinstance(entry, tuple): |
|
ds_id, n_gen = entry |
|
else: |
|
ds_id, n_gen = entry, default_n_gen |
|
fut = pool.submit(run_dataset, ds_id, n_gen) |
|
fut_to_name[fut] = ds_id |
|
|
|
for fut in cf.as_completed(fut_to_name): |
|
name = fut_to_name[fut] |
|
try: |
|
fut.result() |
|
except Exception as exc: |
|
print(f"❌ {name} failed: {exc!r}") |
|
else: |
|
print(f"✅ {name} done") |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
run_all(DATASETS, max_workers=min(len(DATASETS), os.cpu_count() * 2)) |
|
|
|
|
|
|
|
'''Below is code for gemini inference''' |
|
|
|
|