jina-code-embeddings-0.5b / modeling_jina_embeddings_c1.py
dariakryvosheieva's picture
upload last-token model
c4395ff
raw
history blame
5.91 kB
from typing import List, Union
import torch
import numpy as np
from transformers.utils import is_flash_attn_2_available
from transformers.models.qwen2 import Qwen2Model
from transformers.models.qwen2.tokenization_qwen2_fast import Qwen2TokenizerFast
from transformers.models.qwen2.configuration_qwen2 import Qwen2Config
INSTRUCTION_CONFIG = {
"nl2code": {
"query": "Find the most relevant code snippet given the following query:\n",
"passage": "Candidate code snippet:\n"
},
"qa": {
"query": "Find the most relevant answer given the following question:\n",
"passage": "Candidate answer:\n"
},
"code2code": {
"query": "Find an equivalent code snippet given the following code snippet:\n",
"passage": "Candidate code snippet:\n"
},
"code2nl": {
"query": "Find the most relevant comment given the following code snippet:\n",
"passage": "Candidate comment:\n"
},
"code2completion": {
"query": "Find the most relevant completion given the following start of code snippet:\n",
"passage": "Candidate completion:\n"
}
}
def batch(iterable, n=1):
items = len(iterable)
for ndx in range(0, items, n):
yield iterable[ndx : min(ndx + n, items)]
def last_token_pooling(model_output, attention_mask):
token_embeddings = model_output[0]
left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0])
if left_padding:
return token_embeddings[:, -1]
else:
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = token_embeddings.shape[0]
return token_embeddings[torch.arange(batch_size, device=token_embeddings.device), sequence_lengths].float()
class JinaEmbeddingsC1Model(Qwen2Model):
def __init__(self, config: Qwen2Config):
Qwen2Model.__init__(self, config)
self.instructions = INSTRUCTION_CONFIG
def forward(
self,
input_ids: torch.LongTensor,
attention_mask: torch.Tensor,
**kwargs
) -> List[torch.Tensor]:
"""
Forward pass through the model.
"""
batch_model_output = super().forward(
input_ids=input_ids,
attention_mask=attention_mask,
**kwargs
)
batch_sentence_embeddings = last_token_pooling(
batch_model_output, attention_mask
)
return batch_sentence_embeddings
def encode(
self,
sentences: List[str],
batch_size: int = 32,
max_length: int = 32768,
task: str = "nl2code",
prompt_name: str = "query",
return_numpy: bool = False,
truncate_dim: int = 896,
) -> Union[np.ndarray, List[torch.Tensor]]:
"""
Encodes a list of texts into embeddings.
Args:
sentences: list of text strings to encode
batch_size: Number of texts to process at once
max_length: Maximum token length for text processing
task: Type of retrieval task ('nl2code', 'qa', or 'code2code')
prompt_name: Type of text being encoded ('query' or 'passage')
return_numpy: Whether to return numpy arrays instead of torch tensors
truncate_dim: Dimension to truncate embeddings to (64, 128, 256, 512, or 896)
Returns:
List of text embeddings as tensors or numpy arrays
"""
assert task in self.config.task_names, \
f"Invalid task: {task}. Must be one of {self.config.task_names}."
assert prompt_name in self.config.prompt_names, \
f"Invalid prompt name: {prompt_name}. Must be one of {self.config.prompt_names}."
assert truncate_dim in self.config.matryoshka_dims, \
f"Invalid embedding dimension: {truncate_dim}. Must be one of {self.config.matryoshka_dims}."
instruction = self.instructions[task][prompt_name]
sentences = [f'{instruction}{sentence}' for sentence in sentences]
embeddings = []
self.eval()
with torch.inference_mode():
for batch_of_sentences in batch(sentences, n=batch_size):
batch_encoded_input = self.tokenizer(
batch_of_sentences,
padding=True,
truncation=True,
return_tensors="pt",
max_length=max_length
).to(self.device)
batch_sentence_embeddings = self(
**batch_encoded_input,
output_attentions=False,
return_dict=True,
max_length=max_length
)
batch_sentence_embeddings = batch_sentence_embeddings[:, :truncate_dim]
batch_sentence_embeddings = torch.nn.functional.normalize(
batch_sentence_embeddings, p=2, dim=-1
).to("cpu")
embeddings.append(batch_sentence_embeddings)
if return_numpy:
return np.concatenate([b.numpy() for b in embeddings], axis=0)
return [t for b in embeddings for t in torch.unbind(b, dim=0)]
@classmethod
def from_pretrained(
cls,
pretrained_model_name_or_path,
*args,
**kwargs,
):
"""
Loads a pretrained model.
"""
if "torch_dtype" not in kwargs:
kwargs["torch_dtype"] = "auto"
if "attn_implementation" not in kwargs:
kwargs["attn_implementation"] = "flash_attention_2" if is_flash_attn_2_available() else "sdpa"
model = super().from_pretrained(
pretrained_model_name_or_path, *args, **kwargs
)
model.tokenizer = Qwen2TokenizerFast.from_pretrained(
pretrained_model_name_or_path,
trust_remote_code=True
)
return model