Source code for easydel.inference.tools.parsers.internlm2_tool_parser
# Copyright 2025 The EasyDeL Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
from collections.abc import Sequence
from uuid import uuid4
import partial_json_parser
from eformer.loggings import get_logger
from partial_json_parser.core.options import Allow
from transformers import AutoTokenizer as AnyTokenizer
from ...openai_api_modules import (
ChatCompletionRequest,
DeltaFunctionCall,
DeltaMessage,
DeltaToolCall,
ExtractedToolCallInformation,
FunctionCall,
ToolCall,
)
from ..abstract_tool import ToolParser, ToolParserManager
from ..utils import extract_intermediate_diff
logger = get_logger(__name__)
[docs]@ToolParserManager.register_module(["internlm"])
class Internlm2ToolParser(ToolParser):
"""
Tool parser for InternLM2 models.
Handles action/plugin calls with special tokens:
<|action_start|><|plugin|>{...}<|action_end|>
Features:
- Position-based streaming parser
- Supports both 'parameters' and 'arguments' fields
- Adjusts request settings for special tokens
- Handles partial JSON with incremental diff extraction
The parser maintains a cursor position to track progress through
the output stream and properly handle action boundaries.
"""
def __init__(self, tokenizer: AnyTokenizer):
super().__init__(tokenizer)
self.position = 0
[docs] def adjust_request(self, request: ChatCompletionRequest) -> ChatCompletionRequest:
if request.tools and request.tool_choice != "none":
request.skip_special_tokens = False
return request
[docs] def get_arguments(self, obj):
if "parameters" in obj:
return obj.get("parameters")
elif "arguments" in obj:
return obj.get("arguments")
return None
[docs] def extract_tool_calls_streaming(
self,
previous_text: str,
current_text: str,
delta_text: str,
previous_token_ids: Sequence[int],
current_token_ids: Sequence[int],
delta_token_ids: Sequence[int],
request: ChatCompletionRequest,
) -> DeltaMessage | None:
if "<|action_start|>" not in current_text:
self.position = len(current_text)
return DeltaMessage(content=delta_text)
if self.current_tool_id > 0:
return DeltaMessage(content="")
last_pos = self.position
if "<|action_start|><|plugin|>" not in current_text[last_pos:]:
return None
new_delta = current_text[last_pos:]
text, action = new_delta.split("<|action_start|><|plugin|>")
if len(text) > 0:
self.position = self.position + len(text)
return DeltaMessage(content=text)
action = action.strip()
action = action.split("<|action_end|>".strip())[0]
flags = Allow.ALL if self.current_tool_name_sent else Allow.ALL & ~Allow.STR
try:
parsable_arr = action
try:
tool_call_arr: dict = partial_json_parser.loads(parsable_arr, flags)
except partial_json_parser.core.exceptions.MalformedJSON:
logger.debug("not enough tokens to parse into JSON yet")
return None
if not self.current_tool_name_sent:
function_name = tool_call_arr.get("name")
if function_name:
self.current_tool_id = self.current_tool_id + 1
delta = DeltaMessage(
tool_calls=[
DeltaToolCall(
index=self.current_tool_id,
type="function",
id=f"chatcmpl-tool-{uuid4()}",
function=DeltaFunctionCall(name=function_name).model_dump(exclude_none=True),
)
]
)
self.current_tool_name_sent = True
self.streamed_args_for_tool.append("")
else:
delta = None
else:
prev_arguments = self.get_arguments(self.prev_tool_call_arr[self.current_tool_id])
cur_arguments = self.get_arguments(tool_call_arr)
if not cur_arguments and not prev_arguments:
delta = None
elif not cur_arguments and prev_arguments:
logger.error("INVARIANT - impossible to have arguments reset mid-arguments")
delta = None
elif cur_arguments and not prev_arguments:
cur_arguments_json = json.dumps(cur_arguments, ensure_ascii=False)
arguments_delta = cur_arguments_json[: cur_arguments_json.index(delta_text) + len(delta_text)]
delta = DeltaMessage(
tool_calls=[
DeltaToolCall(
index=self.current_tool_id,
function=DeltaFunctionCall(arguments=arguments_delta).model_dump(exclude_none=True),
)
]
)
self.streamed_args_for_tool[self.current_tool_id] += arguments_delta
elif cur_arguments and prev_arguments:
cur_args_json = json.dumps(cur_arguments, ensure_ascii=False)
prev_args_json = json.dumps(prev_arguments, ensure_ascii=False)
argument_diff = extract_intermediate_diff(cur_args_json, prev_args_json)
delta = DeltaMessage(
tool_calls=[
DeltaToolCall(
index=self.current_tool_id,
function=DeltaFunctionCall(arguments=argument_diff).model_dump(exclude_none=True),
)
]
)
self.streamed_args_for_tool[self.current_tool_id] += argument_diff
tool_call_arr["arguments"] = self.get_arguments(tool_call_arr)
self.prev_tool_call_arr = [tool_call_arr]
return delta
except Exception:
logger.exception("Error trying to handle streaming tool call.")
logger.debug("Skipping chunk as a result of tool streaming extraction error")
return None
[docs] def extract_tool_calls(
self,
model_output: str,
request: ChatCompletionRequest,
) -> ExtractedToolCallInformation:
text = model_output
tools = request.tools
if "<|action_start|><|plugin|>" in text:
text, action = text.split("<|action_start|><|plugin|>")
action = action.split("<|action_end|>".strip())[0]
action = action[action.find("{") :]
action_dict = json.loads(action)
name, parameters = (
action_dict["name"],
json.dumps(action_dict.get("parameters", action_dict.get("arguments", {})), ensure_ascii=False),
)
if not tools or name not in [t.function.name for t in tools]:
ExtractedToolCallInformation(tools_called=False, tool_calls=[], content=text)
tool_calls = [ToolCall(function=FunctionCall(name=name, arguments=parameters))]
return ExtractedToolCallInformation(
tools_called=True, tool_calls=tool_calls, content=text if len(text) > 0 else None
)
return ExtractedToolCallInformation(tools_called=False, tool_calls=[], content=text)