# Copyright (c) 2024, MeetKai Inc. All rights reserved.
from copy import deepcopy
import datetime
import json
from typing import Any, Dict, List, Literal, Optional, Union
import jsonref
from pydantic import BaseModel, Field, model_validator
from typing_extensions import Self
from transformers.tokenization_utils_base import BatchEncoding
from transformers.tokenization_utils_fast import PreTrainedTokenizerFast
from transformers.utils import TensorType, logging
logger = logging.get_logger(__name__)
def get_instruction_string(custom_tool_definition) -> str:
name, description = (
custom_tool_definition["name"],
custom_tool_definition["description"],
)
return f"Use the function '{name}' to '{description}'"
def get_parameters_string(custom_tool_definition) -> str:
return json.dumps(custom_tool_definition)
def get_system_prompt_for_custom_tools(custom_tools: List) -> str:
custom_tool_params = ""
for t in custom_tools:
custom_tool_params += get_instruction_string(t) + "\n"
custom_tool_params += get_parameters_string(t) + "\n\n"
content = f"""
You have access to the following functions:
{custom_tool_params}
Think very carefully before calling functions.
If a you choose to call a function ONLY reply in the following format:
<{{start_tag}}={{function_name}}>{{parameters}}{{end_tag}}
where
start_tag => ` a JSON dict with the function argument name as key and function argument value as value.
end_tag => ``
Here is an example,
{{"example_name": "example_value"}}
Reminder:
- If looking for real time information use relevant functions before falling back to brave_search
- Function calls MUST follow the specified format, start with
- Required parameters MUST be specified
- Only call one function at a time
- Put the entire function call reply on one line
"""
return content
def get_system_message_for_tools(tools: List[Dict], use_code_interpreter) -> List[Dict]:
content = ""
if use_code_interpreter:
content += "Environment: ipython\n"
current_date = datetime.datetime.now()
formatted_date = current_date.strftime("%d %B %Y")
date_str = f"""
Cutting Knowledge Date: December 2023\n\n"""
content += date_str
if tools:
custom_message = get_system_prompt_for_custom_tools(tools)
content += custom_message
return {"role": "system", "content": content}
class FunctionaryTokenizer(PreTrainedTokenizerFast):
def apply_chat_template(
self,
conversation: Union[List[Dict[str, str]], List[List[Dict[str, str]]], str],
tools: Optional[List[Dict[str, Any]]],
chat_template: Optional[str] = None,
add_generation_prompt: bool = False,
tokenize: bool = True,
padding: bool = False,
truncation: bool = False,
max_length: Optional[int] = None,
return_tensors: Optional[Union[str, TensorType]] = None,
return_dict: bool = False,
tokenizer_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Union[str, List[int], List[str], List[List[int]], BatchEncoding]:
if return_dict and not tokenize:
raise ValueError(
"`return_dict=True` is incompatible with `tokenize=False`, because there is no dict "
"of tokenizer outputs to return."
)
if tokenizer_kwargs is None:
tokenizer_kwargs = {}
using_default_template = False
# First, handle the cases when the model has a dict of multiple templates
if isinstance(self.chat_template, dict) or (
self.chat_template is None and isinstance(self.default_chat_template, dict)
):
if self.chat_template is not None:
template_dict = self.chat_template
using_default_dict = False
else:
template_dict = self.default_chat_template
using_default_dict = True
if chat_template is not None and chat_template in template_dict:
# The user can pass the name of a template to the chat template argument instead of an entire template
chat_template = template_dict[chat_template]
if using_default_dict:
using_default_template = True
elif chat_template is None and "default" in template_dict:
chat_template = template_dict["default"]
if using_default_dict:
using_default_template = True
elif chat_template is None:
raise ValueError(
"This model has multiple chat templates with no default specified! Please either pass a chat "
"template or the name of the template you wish to use to the `chat_template` argument. Available "
f"template names are {sorted(template_dict.keys())}."
)
elif chat_template is None:
# These are the cases when the model has a single template
# priority: `chat_template` argument > `tokenizer.chat_template` > `tokenizer.default_chat_template
if self.chat_template is not None:
chat_template = self.chat_template
else:
chat_template = self.default_chat_template
using_default_template = True
if using_default_template:
logger.warning_once(
"No chat template is set for this tokenizer, falling back to a default class-level template. This is "
"very error-prone, because models are often trained with templates different from the class default! "
"Default chat templates are a legacy feature and will be removed in Transformers v4.43, at which "
"point any code depending on them will stop working. We recommend setting a valid chat template before "
"then to ensure that this model continues working without issues."
)
# Prepare tools/functions into schema
functions_pydantic_to_render = []
has_code_interpreter = False
if tools is not None:
for item in tools:
if "function" in item and item["function"] is not None:
functions_pydantic_to_render.append(item["function"])
elif "type" in item and item["type"] == "code_interpreter":
has_code_interpreter = True
else:
functions_pydantic_to_render.append(item)
tools_system_message = get_system_message_for_tools(functions_pydantic_to_render, has_code_interpreter)
conversation.insert(0, tools_system_message)
# Compilation function uses a cache to avoid recompiling the same template
compiled_template = self._compile_jinja_template(chat_template)
if isinstance(conversation, (list, tuple)) and (
isinstance(conversation[0], (list, tuple)) or hasattr(conversation[0], "messages")
):
conversations = conversation
is_batched = True
else:
conversations = [conversation]
is_batched = False
rendered = []
template_kwargs = {**self.special_tokens_map, **kwargs} # kwargs overwrite special tokens if both are present
for chat in conversations:
if hasattr(chat, "messages"):
# Indicates it's a Conversation object
chat = chat.messages
rendered_chat = compiled_template.render(
messages=chat, add_generation_prompt=add_generation_prompt, **template_kwargs
)
rendered.append(rendered_chat)
if not is_batched:
rendered = rendered[0]
if tokenize:
out = self(
rendered,
padding=padding,
truncation=truncation,
max_length=max_length,
add_special_tokens=False,
return_tensors=return_tensors,
**tokenizer_kwargs,
)
if return_dict:
return out
else:
return out["input_ids"]
else:
return rendered