Skip to content

Commit 2f1c5de

Browse files
committed
fix(langchain): normalize tool definitions and tool call serialization for Anthropic and OpenAI providers
- Add _to_langfuse_tool helper to normalize tool definitions into OpenAI canonical format, handling Anthropic's input_schema shape - Add _convert_tool_call helper to unify tool_calls and invalid_tool_calls conversion, supporting both LangChain (args) and Anthropic streaming (input) formats - Structure on_llm_start input as {messages, tools} so the backend's extractToolsFromObservation can find tool definitions at the top-level tools key - Add _normalize_anthropic_content_blocks to strip streaming artifacts (index, partial_json) from Anthropic tool_use content blocks and fill empty input from message.tool_calls - Add unit tests for all helpers and update test_callback_openai_functions_with_tools for the new input structure
1 parent caddeff commit 2f1c5de

2 files changed

Lines changed: 309 additions & 19 deletions

File tree

langfuse/langchain/CallbackHandler.py

Lines changed: 148 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from contextvars import Token
23
from typing import (
34
Any,
@@ -35,6 +36,110 @@
3536
from langfuse.logger import langfuse_logger
3637
from langfuse.types import TraceContext
3738

39+
40+
def _to_langfuse_tool(tool: Any) -> Any:
41+
"""Normalize a tool definition to OpenAI format: {type, function: {name, description, parameters}}.
42+
43+
LangChain providers serialize tools differently depending on the backend:
44+
- Anthropic (ChatAnthropic): {name, description, input_schema}
45+
- OpenAI / LiteLLM: {type: "function", function: {name, description, parameters}}
46+
47+
Langfuse's extractTools uses OpenAIToolSchema to parse tools from the top-level
48+
"tools" key in the observation input. Both provider formats are normalized here
49+
into that canonical OpenAI shape so the schema parse succeeds.
50+
"""
51+
if not isinstance(tool, dict):
52+
return tool
53+
# Already in OpenAI format: {type: "function", function: {name, description, parameters}}
54+
if tool.get("type") == "function" and "function" in tool:
55+
return tool
56+
# Anthropic format: {name, description, input_schema} -> OpenAI format
57+
if "name" in tool and "input_schema" in tool:
58+
return {
59+
"type": "function",
60+
"function": {
61+
"name": tool["name"],
62+
"description": tool.get("description", ""),
63+
"parameters": tool["input_schema"],
64+
},
65+
}
66+
return tool
67+
68+
69+
def _normalize_anthropic_content_blocks(
70+
content: List[Any], tool_calls: List[Dict[str, Any]]
71+
) -> List[Any]:
72+
"""Remove streaming artifacts from Anthropic content blocks.
73+
74+
Anthropic streaming leaves tool_use blocks with ``input: {}`` and
75+
streaming-specific fields (``index``, ``partial_json``). The actual
76+
arguments are already reconstructed in ``message.tool_calls``. This
77+
helper fills the empty ``input`` from the normalized tool_calls and
78+
strips the streaming-only keys so the block looks like a proper
79+
Anthropic content block.
80+
"""
81+
if not tool_calls:
82+
return content
83+
tc_by_id: Dict[str, Any] = {
84+
tc["id"]: tc.get("args", {})
85+
for tc in tool_calls
86+
if isinstance(tc, dict) and "id" in tc
87+
}
88+
tc_by_name: Dict[str, Any] = {
89+
tc["name"]: tc.get("args", {})
90+
for tc in tool_calls
91+
if isinstance(tc, dict) and "name" in tc
92+
}
93+
result = []
94+
for block in content:
95+
if isinstance(block, dict) and block.get("type") == "tool_use":
96+
block_input = block.get("input") or {}
97+
if not block_input:
98+
block_input = (
99+
tc_by_id.get(block.get("id", ""))
100+
or tc_by_name.get(block.get("name", ""))
101+
or {}
102+
)
103+
normalized = {
104+
k: v for k, v in block.items() if k not in ("index", "partial_json")
105+
}
106+
normalized["input"] = block_input
107+
result.append(normalized)
108+
else:
109+
result.append(block)
110+
return result
111+
112+
113+
def _convert_tool_call(tc: Any, include_error: bool = False) -> Optional[Dict[str, Any]]:
114+
"""Convert a single tool call dict to Langfuse's canonical format.
115+
116+
Handles both LangChain format {name, args, id} and Anthropic streaming
117+
format {type: "tool_use", name, input, id}.
118+
119+
Returns None (and logs a debug message) if tc is not a dict.
120+
Set include_error=True for invalid_tool_calls entries.
121+
"""
122+
if not isinstance(tc, dict):
123+
langfuse_logger.debug("Skipping tool_call entry that is not a dict: %s", tc)
124+
return None
125+
# Anthropic streaming uses "input" instead of "args"
126+
args = tc.get("args") or tc.get("input") or {}
127+
try:
128+
arguments = json.dumps(args)
129+
except (TypeError, ValueError) as e:
130+
langfuse_logger.debug("Failed to serialize tool call args to JSON: %s", e)
131+
arguments = "{}"
132+
result: Dict[str, Any] = {
133+
"id": tc.get("id", ""),
134+
"type": "function",
135+
"name": tc.get("name", ""),
136+
"arguments": arguments,
137+
}
138+
if include_error:
139+
result["error"] = tc.get("error", "")
140+
return result
141+
142+
38143
try:
39144
import langchain
40145

@@ -841,9 +946,16 @@ def __on_llm_action(
841946
self._child_to_parent_run_id_map[run_id] = parent_run_id
842947

843948
try:
949+
observation_input: Any = prompts
844950
tools = kwargs.get("invocation_params", {}).get("tools", None)
845951
if tools and isinstance(tools, list):
846-
prompts.extend([{"role": "tool", "content": tool} for tool in tools])
952+
# Structure input as {messages, tools} so extractToolsFromObservation
953+
# can find tool definitions at the top-level tools key — the canonical
954+
# format expected by the backend's LLMToolDefinitionSchema.
955+
observation_input = {
956+
"messages": prompts,
957+
"tools": [_to_langfuse_tool(t) for t in tools],
958+
}
847959

848960
model_name = self._parse_model_and_log_errors(
849961
serialized=serialized, metadata=metadata, kwargs=kwargs
@@ -868,7 +980,7 @@ def __on_llm_action(
868980

869981
content = {
870982
"name": self.get_langchain_run_name(serialized, **kwargs),
871-
"input": prompts,
983+
"input": observation_input,
872984
"metadata": self.__join_tags_and_metadata(
873985
tags,
874986
metadata,
@@ -1049,21 +1161,47 @@ def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]:
10491161
if isinstance(message, HumanMessage):
10501162
message_dict: Dict[str, Any] = {"role": "user", "content": message.content}
10511163
elif isinstance(message, AIMessage):
1052-
message_dict = {"role": "assistant", "content": message.content}
1053-
1054-
if (
1164+
# Normalize Anthropic content blocks: fill empty tool_use input from
1165+
# message.tool_calls and strip streaming artifacts (index, partial_json).
1166+
content = message.content
1167+
lc_tool_calls = (
1168+
list(message.tool_calls)
1169+
if hasattr(message, "tool_calls") and message.tool_calls
1170+
else []
1171+
)
1172+
if isinstance(content, list) and lc_tool_calls:
1173+
content = _normalize_anthropic_content_blocks(content, lc_tool_calls)
1174+
message_dict = {"role": "assistant", "content": content}
1175+
1176+
# Resolve tool_calls: prefer LangChain's normalized {name, args, id}
1177+
# format; fall back to additional_kwargs["tool_calls"] which contains
1178+
# Anthropic's raw {type: "tool_use", name, input, id} format when
1179+
# streaming is used and message.tool_calls is not populated.
1180+
raw_tool_calls = message.tool_calls if (
10551181
hasattr(message, "tool_calls")
10561182
and message.tool_calls is not None
10571183
and len(message.tool_calls) > 0
1058-
):
1059-
message_dict["tool_calls"] = message.tool_calls
1184+
) else message.additional_kwargs.get("tool_calls") or []
1185+
1186+
if raw_tool_calls:
1187+
converted_tool_calls = [
1188+
r for tc in raw_tool_calls if (r := _convert_tool_call(tc)) is not None
1189+
]
1190+
if converted_tool_calls:
1191+
message_dict["tool_calls"] = converted_tool_calls
10601192

10611193
if (
1062-
hasattr(message, "invalid_tool_calls")
1063-
and message.invalid_tool_calls is not None
1194+
hasattr(message, "invalid_tool_calls")
1195+
and message.invalid_tool_calls is not None
10641196
and len(message.invalid_tool_calls) > 0
10651197
):
1066-
message_dict["invalid_tool_calls"] = message.invalid_tool_calls
1198+
converted_invalid_tool_calls = [
1199+
r
1200+
for tc in message.invalid_tool_calls
1201+
if (r := _convert_tool_call(tc, include_error=True)) is not None
1202+
]
1203+
if converted_invalid_tool_calls:
1204+
message_dict["invalid_tool_calls"] = converted_invalid_tool_calls
10671205

10681206
elif isinstance(message, SystemMessage):
10691207
message_dict = {"role": "system", "content": message.content}

tests/test_langchain.py

Lines changed: 161 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,157 @@
1818

1919
from langfuse._client.client import Langfuse
2020
from langfuse.langchain import CallbackHandler
21+
from langfuse.langchain.CallbackHandler import (
22+
_convert_tool_call,
23+
_normalize_anthropic_content_blocks,
24+
_to_langfuse_tool,
25+
)
2126
from tests.utils import create_uuid, encode_file_to_base64, get_api
2227

2328

29+
# --- Unit tests for _to_langfuse_tool ---
30+
31+
32+
def test_to_langfuse_tool_openai_format():
33+
tool = {
34+
"type": "function",
35+
"function": {
36+
"name": "get_weather",
37+
"description": "Get the weather",
38+
"parameters": {"type": "object", "properties": {}},
39+
},
40+
}
41+
result = _to_langfuse_tool(tool)
42+
# OpenAI format is already canonical — pass through unchanged
43+
assert result == tool
44+
45+
46+
def test_to_langfuse_tool_anthropic_format():
47+
tool = {
48+
"name": "get_weather",
49+
"description": "Get the weather",
50+
"input_schema": {"type": "object", "properties": {}},
51+
}
52+
result = _to_langfuse_tool(tool)
53+
assert result == {
54+
"type": "function",
55+
"function": {
56+
"name": "get_weather",
57+
"description": "Get the weather",
58+
"parameters": {"type": "object", "properties": {}},
59+
},
60+
}
61+
62+
63+
def test_to_langfuse_tool_passthrough_unknown_dict():
64+
tool = {"name": "my_tool", "custom_field": "value"}
65+
result = _to_langfuse_tool(tool)
66+
assert result == tool
67+
68+
69+
def test_to_langfuse_tool_passthrough_non_dict():
70+
result = _to_langfuse_tool("not a dict")
71+
assert result == "not a dict"
72+
73+
74+
# --- Unit tests for _convert_tool_call ---
75+
76+
77+
def test_convert_tool_call_langchain_format():
78+
tc = {"name": "get_weather", "args": {"city": "Berlin"}, "id": "call_1"}
79+
result = _convert_tool_call(tc)
80+
assert result == {
81+
"id": "call_1",
82+
"type": "function",
83+
"name": "get_weather",
84+
"arguments": '{"city": "Berlin"}',
85+
}
86+
87+
88+
def test_convert_tool_call_anthropic_streaming_format():
89+
# Anthropic streaming uses "input" instead of "args"
90+
tc = {"type": "tool_use", "name": "get_weather", "input": {"city": "Berlin"}, "id": "toolu_1"}
91+
result = _convert_tool_call(tc)
92+
assert result == {
93+
"id": "toolu_1",
94+
"type": "function",
95+
"name": "get_weather",
96+
"arguments": '{"city": "Berlin"}',
97+
}
98+
99+
100+
def test_convert_tool_call_include_error():
101+
tc = {"name": "bad_tool", "args": {}, "id": "call_2", "error": "invalid input"}
102+
result = _convert_tool_call(tc, include_error=True)
103+
assert result == {
104+
"id": "call_2",
105+
"type": "function",
106+
"name": "bad_tool",
107+
"arguments": "{}",
108+
"error": "invalid input",
109+
}
110+
111+
112+
def test_convert_tool_call_non_dict_returns_none():
113+
assert _convert_tool_call("not a dict") is None
114+
115+
116+
# --- Unit tests for _normalize_anthropic_content_blocks ---
117+
118+
119+
def test_normalize_anthropic_content_blocks_fills_empty_input():
120+
"""Streaming leaves input:{} and partial_json artifacts — should be filled from tool_calls."""
121+
content = [
122+
{
123+
"type": "tool_use",
124+
"id": "toolu_abc",
125+
"name": "get_weather",
126+
"input": {},
127+
"index": 0,
128+
"partial_json": ['{"city": "Berlin"}'],
129+
}
130+
]
131+
tool_calls = [{"id": "toolu_abc", "name": "get_weather", "args": {"city": "Berlin"}}]
132+
result = _normalize_anthropic_content_blocks(content, tool_calls)
133+
assert result == [
134+
{"type": "tool_use", "id": "toolu_abc", "name": "get_weather", "input": {"city": "Berlin"}}
135+
]
136+
137+
138+
def test_normalize_anthropic_content_blocks_preserves_non_empty_input():
139+
"""If input is already populated, keep it and still strip streaming keys."""
140+
content = [
141+
{
142+
"type": "tool_use",
143+
"id": "toolu_abc",
144+
"name": "get_weather",
145+
"input": {"city": "Berlin"},
146+
"index": 0,
147+
}
148+
]
149+
tool_calls = [{"id": "toolu_abc", "name": "get_weather", "args": {"city": "Paris"}}]
150+
result = _normalize_anthropic_content_blocks(content, tool_calls)
151+
assert result == [
152+
{"type": "tool_use", "id": "toolu_abc", "name": "get_weather", "input": {"city": "Berlin"}}
153+
]
154+
155+
156+
def test_normalize_anthropic_content_blocks_ignores_non_tool_use():
157+
"""Text content blocks should pass through unchanged."""
158+
content = [{"type": "text", "text": "hello", "index": 0}]
159+
result = _normalize_anthropic_content_blocks(content, [])
160+
assert result == content
161+
162+
163+
def test_normalize_anthropic_content_blocks_no_tool_calls_passthrough():
164+
"""Without tool_calls to match against, return content unchanged."""
165+
content = [{"type": "tool_use", "id": "x", "name": "f", "input": {}}]
166+
assert _normalize_anthropic_content_blocks(content, []) is content
167+
168+
169+
# --- End unit tests ---
170+
171+
24172
def test_callback_generated_from_trace_chat():
25173
langfuse = Langfuse()
26174

@@ -762,15 +910,19 @@ class GetWeather(BaseModel):
762910

763911
for generation in generations:
764912
assert generation.input is not None
765-
tool_messages = [msg for msg in generation.input if msg["role"] == "tool"]
766-
assert len(tool_messages) == 2
767-
assert any(
768-
"standardize_address" == msg["content"]["function"]["name"]
769-
for msg in tool_messages
770-
)
771-
assert any(
772-
"get_weather" == msg["content"]["function"]["name"] for msg in tool_messages
773-
)
913+
# Input is structured as {messages, tools} for extractToolsFromObservation
914+
assert "messages" in generation.input
915+
assert "tools" in generation.input
916+
# Each tool must conform to OpenAI format (what extractTools parses)
917+
for t in generation.input["tools"]:
918+
assert t.get("type") == "function"
919+
assert "function" in t
920+
assert "name" in t["function"]
921+
assert "description" in t["function"]
922+
assert "parameters" in t["function"]
923+
tool_names = [t["function"]["name"] for t in generation.input["tools"]]
924+
assert "standardize_address" in tool_names
925+
assert "get_weather" in tool_names
774926

775927
assert generation.output is not None
776928

0 commit comments

Comments
 (0)