Source code for easydel.inference.tools.parsers.internlm2_tool_parser

# Copyright 2025 The EasyDeL Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import json
from collections.abc import Sequence
from uuid import uuid4

import partial_json_parser
from eformer.loggings import get_logger
from partial_json_parser.core.options import Allow
from transformers import AutoTokenizer as AnyTokenizer

from ...openai_api_modules import (
    ChatCompletionRequest,
    DeltaFunctionCall,
    DeltaMessage,
    DeltaToolCall,
    ExtractedToolCallInformation,
    FunctionCall,
    ToolCall,
)
from ..abstract_tool import ToolParser, ToolParserManager
from ..utils import extract_intermediate_diff

logger = get_logger(__name__)


[docs]@ToolParserManager.register_module(["internlm"]) class Internlm2ToolParser(ToolParser): """ Tool parser for InternLM2 models. Handles action/plugin calls with special tokens: <|action_start|><|plugin|>{...}<|action_end|> Features: - Position-based streaming parser - Supports both 'parameters' and 'arguments' fields - Adjusts request settings for special tokens - Handles partial JSON with incremental diff extraction The parser maintains a cursor position to track progress through the output stream and properly handle action boundaries. """ def __init__(self, tokenizer: AnyTokenizer): super().__init__(tokenizer) self.position = 0
[docs] def adjust_request(self, request: ChatCompletionRequest) -> ChatCompletionRequest: if request.tools and request.tool_choice != "none": request.skip_special_tokens = False return request
[docs] def get_arguments(self, obj): if "parameters" in obj: return obj.get("parameters") elif "arguments" in obj: return obj.get("arguments") return None
[docs] def extract_tool_calls_streaming( self, previous_text: str, current_text: str, delta_text: str, previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], request: ChatCompletionRequest, ) -> DeltaMessage | None: if "<|action_start|>" not in current_text: self.position = len(current_text) return DeltaMessage(content=delta_text) if self.current_tool_id > 0: return DeltaMessage(content="") last_pos = self.position if "<|action_start|><|plugin|>" not in current_text[last_pos:]: return None new_delta = current_text[last_pos:] text, action = new_delta.split("<|action_start|><|plugin|>") if len(text) > 0: self.position = self.position + len(text) return DeltaMessage(content=text) action = action.strip() action = action.split("<|action_end|>".strip())[0] flags = Allow.ALL if self.current_tool_name_sent else Allow.ALL & ~Allow.STR try: parsable_arr = action try: tool_call_arr: dict = partial_json_parser.loads(parsable_arr, flags) except partial_json_parser.core.exceptions.MalformedJSON: logger.debug("not enough tokens to parse into JSON yet") return None if not self.current_tool_name_sent: function_name = tool_call_arr.get("name") if function_name: self.current_tool_id = self.current_tool_id + 1 delta = DeltaMessage( tool_calls=[ DeltaToolCall( index=self.current_tool_id, type="function", id=f"chatcmpl-tool-{uuid4()}", function=DeltaFunctionCall(name=function_name).model_dump(exclude_none=True), ) ] ) self.current_tool_name_sent = True self.streamed_args_for_tool.append("") else: delta = None else: prev_arguments = self.get_arguments(self.prev_tool_call_arr[self.current_tool_id]) cur_arguments = self.get_arguments(tool_call_arr) if not cur_arguments and not prev_arguments: delta = None elif not cur_arguments and prev_arguments: logger.error("INVARIANT - impossible to have arguments reset mid-arguments") delta = None elif cur_arguments and not prev_arguments: cur_arguments_json = json.dumps(cur_arguments, ensure_ascii=False) arguments_delta = cur_arguments_json[: cur_arguments_json.index(delta_text) + len(delta_text)] delta = DeltaMessage( tool_calls=[ DeltaToolCall( index=self.current_tool_id, function=DeltaFunctionCall(arguments=arguments_delta).model_dump(exclude_none=True), ) ] ) self.streamed_args_for_tool[self.current_tool_id] += arguments_delta elif cur_arguments and prev_arguments: cur_args_json = json.dumps(cur_arguments, ensure_ascii=False) prev_args_json = json.dumps(prev_arguments, ensure_ascii=False) argument_diff = extract_intermediate_diff(cur_args_json, prev_args_json) delta = DeltaMessage( tool_calls=[ DeltaToolCall( index=self.current_tool_id, function=DeltaFunctionCall(arguments=argument_diff).model_dump(exclude_none=True), ) ] ) self.streamed_args_for_tool[self.current_tool_id] += argument_diff tool_call_arr["arguments"] = self.get_arguments(tool_call_arr) self.prev_tool_call_arr = [tool_call_arr] return delta except Exception: logger.exception("Error trying to handle streaming tool call.") logger.debug("Skipping chunk as a result of tool streaming extraction error") return None
[docs] def extract_tool_calls( self, model_output: str, request: ChatCompletionRequest, ) -> ExtractedToolCallInformation: text = model_output tools = request.tools if "<|action_start|><|plugin|>" in text: text, action = text.split("<|action_start|><|plugin|>") action = action.split("<|action_end|>".strip())[0] action = action[action.find("{") :] action_dict = json.loads(action) name, parameters = ( action_dict["name"], json.dumps(action_dict.get("parameters", action_dict.get("arguments", {})), ensure_ascii=False), ) if not tools or name not in [t.function.name for t in tools]: ExtractedToolCallInformation(tools_called=False, tool_calls=[], content=text) tool_calls = [ToolCall(function=FunctionCall(name=name, arguments=parameters))] return ExtractedToolCallInformation( tools_called=True, tool_calls=tool_calls, content=text if len(text) > 0 else None ) return ExtractedToolCallInformation(tools_called=False, tool_calls=[], content=text)