from typing import List, Union import torch import numpy as np from transformers.utils import is_flash_attn_2_available from transformers.models.qwen2 import Qwen2Model from transformers.models.qwen2.tokenization_qwen2_fast import Qwen2TokenizerFast from transformers.models.qwen2.configuration_qwen2 import Qwen2Config INSTRUCTION_CONFIG = { "nl2code": { "query": "Find the most relevant code snippet given the following query:\n", "passage": "Candidate code snippet:\n" }, "qa": { "query": "Find the most relevant answer given the following question:\n", "passage": "Candidate answer:\n" }, "code2code": { "query": "Find an equivalent code snippet given the following code snippet:\n", "passage": "Candidate code snippet:\n" }, "code2nl": { "query": "Find the most relevant comment given the following code snippet:\n", "passage": "Candidate comment:\n" }, "code2completion": { "query": "Find the most relevant completion given the following start of code snippet:\n", "passage": "Candidate completion:\n" } } def batch(iterable, n=1): items = len(iterable) for ndx in range(0, items, n): yield iterable[ndx : min(ndx + n, items)] def last_token_pooling(model_output, attention_mask): token_embeddings = model_output[0] left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0]) if left_padding: return token_embeddings[:, -1] else: sequence_lengths = attention_mask.sum(dim=1) - 1 batch_size = token_embeddings.shape[0] return token_embeddings[torch.arange(batch_size, device=token_embeddings.device), sequence_lengths].float() class JinaEmbeddingsC1Model(Qwen2Model): def __init__(self, config: Qwen2Config): Qwen2Model.__init__(self, config) self.instructions = INSTRUCTION_CONFIG def forward( self, input_ids: torch.LongTensor, attention_mask: torch.Tensor, **kwargs ) -> List[torch.Tensor]: """ Forward pass through the model. """ batch_model_output = super().forward( input_ids=input_ids, attention_mask=attention_mask, **kwargs ) batch_sentence_embeddings = last_token_pooling( batch_model_output, attention_mask ) return batch_sentence_embeddings def encode( self, sentences: List[str], batch_size: int = 32, max_length: int = 32768, task: str = "nl2code", prompt_name: str = "query", return_numpy: bool = False, truncate_dim: int = 896, ) -> Union[np.ndarray, List[torch.Tensor]]: """ Encodes a list of texts into embeddings. Args: sentences: list of text strings to encode batch_size: Number of texts to process at once max_length: Maximum token length for text processing task: Type of retrieval task ('nl2code', 'qa', or 'code2code') prompt_name: Type of text being encoded ('query' or 'passage') return_numpy: Whether to return numpy arrays instead of torch tensors truncate_dim: Dimension to truncate embeddings to (64, 128, 256, 512, or 896) Returns: List of text embeddings as tensors or numpy arrays """ assert task in self.config.task_names, \ f"Invalid task: {task}. Must be one of {self.config.task_names}." assert prompt_name in self.config.prompt_names, \ f"Invalid prompt name: {prompt_name}. Must be one of {self.config.prompt_names}." assert truncate_dim in self.config.matryoshka_dims, \ f"Invalid embedding dimension: {truncate_dim}. Must be one of {self.config.matryoshka_dims}." instruction = self.instructions[task][prompt_name] sentences = [f'{instruction}{sentence}' for sentence in sentences] embeddings = [] self.eval() with torch.inference_mode(): for batch_of_sentences in batch(sentences, n=batch_size): batch_encoded_input = self.tokenizer( batch_of_sentences, padding=True, truncation=True, return_tensors="pt", max_length=max_length ).to(self.device) batch_sentence_embeddings = self( **batch_encoded_input, output_attentions=False, return_dict=True, max_length=max_length ) batch_sentence_embeddings = batch_sentence_embeddings[:, :truncate_dim] batch_sentence_embeddings = torch.nn.functional.normalize( batch_sentence_embeddings, p=2, dim=-1 ).to("cpu") embeddings.append(batch_sentence_embeddings) if return_numpy: return np.concatenate([b.numpy() for b in embeddings], axis=0) return [t for b in embeddings for t in torch.unbind(b, dim=0)] @classmethod def from_pretrained( cls, pretrained_model_name_or_path, *args, **kwargs, ): """ Loads a pretrained model. """ if "torch_dtype" not in kwargs: kwargs["torch_dtype"] = "auto" if "attn_implementation" not in kwargs: kwargs["attn_implementation"] = "flash_attention_2" if is_flash_attn_2_available() else "sdpa" model = super().from_pretrained( pretrained_model_name_or_path, *args, **kwargs ) model.tokenizer = Qwen2TokenizerFast.from_pretrained( pretrained_model_name_or_path, trust_remote_code=True ) return model