|
from typing import List, Union |
|
|
|
import torch |
|
import numpy as np |
|
|
|
from transformers.utils import is_flash_attn_2_available |
|
from transformers.models.qwen2 import Qwen2Model |
|
from transformers.models.qwen2.tokenization_qwen2_fast import Qwen2TokenizerFast |
|
from transformers.models.qwen2.configuration_qwen2 import Qwen2Config |
|
|
|
|
|
INSTRUCTION_CONFIG = { |
|
"nl2code": { |
|
"query": "Find the most relevant code snippet given the following query:\n", |
|
"passage": "Candidate code snippet:\n" |
|
}, |
|
"qa": { |
|
"query": "Find the most relevant answer given the following question:\n", |
|
"passage": "Candidate answer:\n" |
|
}, |
|
"code2code": { |
|
"query": "Find an equivalent code snippet given the following code snippet:\n", |
|
"passage": "Candidate code snippet:\n" |
|
}, |
|
"code2nl": { |
|
"query": "Find the most relevant comment given the following code snippet:\n", |
|
"passage": "Candidate comment:\n" |
|
}, |
|
"code2completion": { |
|
"query": "Find the most relevant completion given the following start of code snippet:\n", |
|
"passage": "Candidate completion:\n" |
|
} |
|
} |
|
|
|
|
|
def batch(iterable, n=1): |
|
items = len(iterable) |
|
for ndx in range(0, items, n): |
|
yield iterable[ndx : min(ndx + n, items)] |
|
|
|
|
|
def last_token_pooling(model_output, attention_mask): |
|
token_embeddings = model_output[0] |
|
left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0]) |
|
if left_padding: |
|
return token_embeddings[:, -1] |
|
else: |
|
sequence_lengths = attention_mask.sum(dim=1) - 1 |
|
batch_size = token_embeddings.shape[0] |
|
return token_embeddings[torch.arange(batch_size, device=token_embeddings.device), sequence_lengths].float() |
|
|
|
|
|
class JinaEmbeddingsC1Model(Qwen2Model): |
|
def __init__(self, config: Qwen2Config): |
|
Qwen2Model.__init__(self, config) |
|
self.instructions = INSTRUCTION_CONFIG |
|
|
|
|
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor, |
|
attention_mask: torch.Tensor, |
|
**kwargs |
|
) -> List[torch.Tensor]: |
|
""" |
|
Forward pass through the model. |
|
""" |
|
batch_model_output = super().forward( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
**kwargs |
|
) |
|
batch_sentence_embeddings = last_token_pooling( |
|
batch_model_output, attention_mask |
|
) |
|
return batch_sentence_embeddings |
|
|
|
|
|
def encode( |
|
self, |
|
sentences: List[str], |
|
batch_size: int = 32, |
|
max_length: int = 32768, |
|
task: str = "nl2code", |
|
prompt_name: str = "query", |
|
return_numpy: bool = False, |
|
truncate_dim: int = 896, |
|
) -> Union[np.ndarray, List[torch.Tensor]]: |
|
""" |
|
Encodes a list of texts into embeddings. |
|
Args: |
|
sentences: list of text strings to encode |
|
batch_size: Number of texts to process at once |
|
max_length: Maximum token length for text processing |
|
task: Type of retrieval task ('nl2code', 'qa', or 'code2code') |
|
prompt_name: Type of text being encoded ('query' or 'passage') |
|
return_numpy: Whether to return numpy arrays instead of torch tensors |
|
truncate_dim: Dimension to truncate embeddings to (64, 128, 256, 512, or 896) |
|
Returns: |
|
List of text embeddings as tensors or numpy arrays |
|
""" |
|
assert task in self.config.task_names, \ |
|
f"Invalid task: {task}. Must be one of {self.config.task_names}." |
|
assert prompt_name in self.config.prompt_names, \ |
|
f"Invalid prompt name: {prompt_name}. Must be one of {self.config.prompt_names}." |
|
assert truncate_dim in self.config.matryoshka_dims, \ |
|
f"Invalid embedding dimension: {truncate_dim}. Must be one of {self.config.matryoshka_dims}." |
|
|
|
instruction = self.instructions[task][prompt_name] |
|
sentences = [f'{instruction}{sentence}' for sentence in sentences] |
|
embeddings = [] |
|
|
|
self.eval() |
|
|
|
with torch.inference_mode(): |
|
for batch_of_sentences in batch(sentences, n=batch_size): |
|
batch_encoded_input = self.tokenizer( |
|
batch_of_sentences, |
|
padding=True, |
|
truncation=True, |
|
return_tensors="pt", |
|
max_length=max_length |
|
).to(self.device) |
|
|
|
batch_sentence_embeddings = self( |
|
**batch_encoded_input, |
|
output_attentions=False, |
|
return_dict=True, |
|
max_length=max_length |
|
) |
|
|
|
batch_sentence_embeddings = batch_sentence_embeddings[:, :truncate_dim] |
|
batch_sentence_embeddings = torch.nn.functional.normalize( |
|
batch_sentence_embeddings, p=2, dim=-1 |
|
).to("cpu") |
|
|
|
embeddings.append(batch_sentence_embeddings) |
|
|
|
if return_numpy: |
|
return np.concatenate([b.numpy() for b in embeddings], axis=0) |
|
return [t for b in embeddings for t in torch.unbind(b, dim=0)] |
|
|
|
|
|
@classmethod |
|
def from_pretrained( |
|
cls, |
|
pretrained_model_name_or_path, |
|
*args, |
|
**kwargs, |
|
): |
|
""" |
|
Loads a pretrained model. |
|
""" |
|
if "torch_dtype" not in kwargs: |
|
kwargs["torch_dtype"] = "auto" |
|
|
|
if "attn_implementation" not in kwargs: |
|
kwargs["attn_implementation"] = "flash_attention_2" if is_flash_attn_2_available() else "sdpa" |
|
|
|
model = super().from_pretrained( |
|
pretrained_model_name_or_path, *args, **kwargs |
|
) |
|
|
|
model.tokenizer = Qwen2TokenizerFast.from_pretrained( |
|
pretrained_model_name_or_path, |
|
trust_remote_code=True |
|
) |
|
|
|
return model |
|
|
|
|