Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .sampo/changesets/add-ai-stop-reason.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
pypi/posthog: minor
---

feat(ai): add $ai_stop_reason extraction for all providers
13 changes: 13 additions & 0 deletions posthog/ai/anthropic/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def _create_streaming(
content_blocks: List[StreamingContentBlock] = []
tools_in_progress: Dict[str, ToolInProgress] = {}
current_text_block: Optional[StreamingContentBlock] = None
stop_reason: Optional[str] = None
response = super().create(**kwargs)

def generator():
Expand All @@ -139,6 +140,7 @@ def generator():
nonlocal content_blocks
nonlocal tools_in_progress
nonlocal current_text_block
nonlocal stop_reason

try:
for event in response:
Expand Down Expand Up @@ -181,6 +183,14 @@ def generator():
event, content_blocks, tools_in_progress
)

# Capture stop reason from message_delta events
if hasattr(event, "type") and event.type == "message_delta":
delta = getattr(event, "delta", None)
if delta is not None:
delta_stop_reason = getattr(delta, "stop_reason", None)
if delta_stop_reason is not None:
stop_reason = delta_stop_reason

yield event

finally:
Expand All @@ -198,6 +208,7 @@ def generator():
latency,
content_blocks,
accumulated_content,
stop_reason=stop_reason,
)

return generator()
Expand All @@ -214,6 +225,7 @@ def _capture_streaming_event(
latency: float,
content_blocks: List[StreamingContentBlock],
accumulated_content: str,
stop_reason: Optional[str] = None,
):
from posthog.ai.types import StreamingEventData
from posthog.ai.anthropic.anthropic_converter import (
Expand Down Expand Up @@ -242,6 +254,7 @@ def _capture_streaming_event(
properties=posthog_properties,
privacy_mode=posthog_privacy_mode,
groups=posthog_groups,
stop_reason=stop_reason,
)

# Use the common capture function
Expand Down
13 changes: 13 additions & 0 deletions posthog/ai/anthropic/anthropic_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async def _create_streaming(
content_blocks: List[StreamingContentBlock] = []
tools_in_progress: Dict[str, ToolInProgress] = {}
current_text_block: Optional[StreamingContentBlock] = None
stop_reason: Optional[str] = None
response = await super().create(**kwargs)

async def generator():
Expand All @@ -139,6 +140,7 @@ async def generator():
nonlocal content_blocks
nonlocal tools_in_progress
nonlocal current_text_block
nonlocal stop_reason

try:
async for event in response:
Expand Down Expand Up @@ -181,6 +183,14 @@ async def generator():
event, content_blocks, tools_in_progress
)

# Capture stop reason from message_delta events
if hasattr(event, "type") and event.type == "message_delta":
delta = getattr(event, "delta", None)
if delta is not None:
delta_stop_reason = getattr(delta, "stop_reason", None)
if delta_stop_reason is not None:
stop_reason = delta_stop_reason

yield event

finally:
Expand All @@ -198,6 +208,7 @@ async def generator():
latency,
content_blocks,
accumulated_content,
stop_reason=stop_reason,
)

return generator()
Expand All @@ -214,6 +225,7 @@ async def _capture_streaming_event(
latency: float,
content_blocks: List[StreamingContentBlock],
accumulated_content: str,
stop_reason: Optional[str] = None,
):
from posthog.ai.types import StreamingEventData
from posthog.ai.anthropic.anthropic_converter import (
Expand Down Expand Up @@ -242,6 +254,7 @@ async def _capture_streaming_event(
properties=posthog_properties,
privacy_mode=posthog_privacy_mode,
groups=posthog_groups,
stop_reason=stop_reason,
)

# Use the common capture function
Expand Down
5 changes: 5 additions & 0 deletions posthog/ai/anthropic/anthropic_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ def extract_anthropic_web_search_count(response: Any) -> int:
return 0


def extract_anthropic_stop_reason(response: Any) -> Optional[str]:
"""Extract stop reason from Anthropic response."""
return getattr(response, "stop_reason", None)


def extract_anthropic_usage_from_response(response: Any) -> TokenUsage:
"""
Extract usage from a full Anthropic response (non-streaming).
Expand Down
8 changes: 8 additions & 0 deletions posthog/ai/claude_agent_sdk/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class _GenerationData:
start_time: float = 0.0
end_time: float = 0.0
span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
stop_reason: Optional[str] = None


class _GenerationTracker:
Expand Down Expand Up @@ -81,6 +82,10 @@ def process_stream_event(self, event: "StreamEvent") -> None:
# message_delta usage reports cumulative output tokens
if usage.get("output_tokens"):
self._current.output_tokens = usage["output_tokens"]
# Extract stop reason from message_delta
delta_stop_reason = raw.get("delta", {}).get("stop_reason")
if delta_stop_reason is not None:
self._current.stop_reason = delta_stop_reason

elif event_type == "message_stop" and self._current is not None:
self._current.end_time = time.time()
Expand Down Expand Up @@ -435,6 +440,9 @@ def _emit_generation(
gen.cache_creation_input_tokens
)

if gen.stop_reason is not None:
properties["$ai_stop_reason"] = gen.stop_reason

if resolved_id is None:
properties["$process_person_profile"] = False

Expand Down
11 changes: 11 additions & 0 deletions posthog/ai/gemini/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from posthog.ai.gemini.gemini_converter import (
extract_gemini_usage_from_chunk,
extract_gemini_content_from_chunk,
extract_gemini_stop_reason_from_chunk,
format_gemini_streaming_output,
)
from posthog.ai.sanitization import sanitize_gemini
Expand Down Expand Up @@ -298,13 +299,15 @@ def _generate_content_streaming(
start_time = time.time()
usage_stats: TokenUsage = TokenUsage(input_tokens=0, output_tokens=0)
accumulated_content = []
stop_reason: Optional[str] = None

kwargs_without_stream = {"model": model, "contents": contents, **kwargs}
response = self._client.models.generate_content_stream(**kwargs_without_stream)

def generator():
nonlocal usage_stats
nonlocal accumulated_content
nonlocal stop_reason
try:
for chunk in response:
# Extract usage stats from chunk
Expand All @@ -320,6 +323,11 @@ def generator():
if content_block is not None:
accumulated_content.append(content_block)

# Extract stop reason from chunk
chunk_stop_reason = extract_gemini_stop_reason_from_chunk(chunk)
if chunk_stop_reason is not None:
stop_reason = chunk_stop_reason

yield chunk

finally:
Expand All @@ -338,6 +346,7 @@ def generator():
usage_stats,
latency,
accumulated_content,
stop_reason=stop_reason,
)

return generator()
Expand All @@ -355,6 +364,7 @@ def _capture_streaming_event(
usage_stats: TokenUsage,
latency: float,
output: Any,
stop_reason: Optional[str] = None,
):
# Prepare standardized event data
formatted_input = self._format_input(contents, **kwargs)
Expand All @@ -374,6 +384,7 @@ def _capture_streaming_event(
properties=properties,
privacy_mode=privacy_mode,
groups=groups,
stop_reason=stop_reason,
)

# Use the common capture function
Expand Down
11 changes: 11 additions & 0 deletions posthog/ai/gemini/gemini_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from posthog.ai.gemini.gemini_converter import (
extract_gemini_usage_from_chunk,
extract_gemini_content_from_chunk,
extract_gemini_stop_reason_from_chunk,
format_gemini_streaming_output,
)
from posthog.ai.sanitization import sanitize_gemini
Expand Down Expand Up @@ -298,6 +299,7 @@ async def _generate_content_streaming(
start_time = time.time()
usage_stats: TokenUsage = TokenUsage(input_tokens=0, output_tokens=0)
accumulated_content = []
stop_reason: Optional[str] = None

kwargs_without_stream = {"model": model, "contents": contents, **kwargs}
response = await self._client.aio.models.generate_content_stream(
Expand All @@ -307,6 +309,7 @@ async def _generate_content_streaming(
async def async_generator():
nonlocal usage_stats
nonlocal accumulated_content
nonlocal stop_reason

try:
async for chunk in response:
Expand All @@ -323,6 +326,11 @@ async def async_generator():
if content_block is not None:
accumulated_content.append(content_block)

# Extract stop reason from chunk
chunk_stop_reason = extract_gemini_stop_reason_from_chunk(chunk)
if chunk_stop_reason is not None:
stop_reason = chunk_stop_reason

yield chunk

finally:
Expand All @@ -341,6 +349,7 @@ async def async_generator():
usage_stats,
latency,
accumulated_content,
stop_reason=stop_reason,
)

return async_generator()
Expand All @@ -358,6 +367,7 @@ def _capture_streaming_event(
usage_stats: TokenUsage,
latency: float,
output: Any,
stop_reason: Optional[str] = None,
):
# Prepare standardized event data
formatted_input = self._format_input(contents, **kwargs)
Expand All @@ -377,6 +387,7 @@ def _capture_streaming_event(
properties=properties,
privacy_mode=privacy_mode,
groups=groups,
stop_reason=stop_reason,
)

# Use the common capture function
Expand Down
25 changes: 25 additions & 0 deletions posthog/ai/gemini/gemini_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,31 @@ def format_gemini_response(response: Any) -> List[FormattedMessage]:
return output


def extract_gemini_stop_reason(response: Any) -> Optional[str]:
"""Extract stop reason from Gemini response."""
if response and hasattr(response, "candidates") and response.candidates:
candidate = response.candidates[0]
finish_reason = getattr(candidate, "finish_reason", None)
if finish_reason is not None:
# Gemini uses enum values — convert to string name
if hasattr(finish_reason, "name"):
return finish_reason.name
return str(finish_reason)
return None


def extract_gemini_stop_reason_from_chunk(chunk: Any) -> Optional[str]:
"""Extract stop reason from a Gemini streaming chunk."""
if chunk and hasattr(chunk, "candidates") and chunk.candidates:
candidate = chunk.candidates[0]
finish_reason = getattr(candidate, "finish_reason", None)
if finish_reason is not None:
if hasattr(finish_reason, "name"):
return finish_reason.name
return str(finish_reason)
return None
Comment thread
carlos-marchal-ph marked this conversation as resolved.
Outdated


def extract_gemini_system_instruction(config: Any) -> Optional[str]:
"""
Extract system instruction from Gemini config parameter.
Expand Down
9 changes: 9 additions & 0 deletions posthog/ai/langchain/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,15 @@ def _capture_generation(
self._ph_client, self._privacy_mode, completions
)

# Extract stop reason from generation info
if output.generations and output.generations[-1]:
last_gen = output.generations[-1][-1]
gen_info = getattr(last_gen, "generation_info", None)
if isinstance(gen_info, dict):
finish_reason = gen_info.get("finish_reason")
if finish_reason is not None:
event_properties["$ai_stop_reason"] = finish_reason

self._ph_client.capture(
distinct_id=self._distinct_id or trace_id,
event="$ai_generation",
Expand Down
Loading
Loading