Skip to content

Commit 2d2aa3c

Browse files
committed
fix exporter bugs
1 parent b8b81f1 commit 2d2aa3c

10 files changed

Lines changed: 101 additions & 149 deletions

veadk/runner.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from veadk.agents.sequential_agent import SequentialAgent
2828
from veadk.evaluation import EvalSetRecorder
2929
from veadk.memory.short_term_memory import ShortTermMemory
30-
from veadk.tracing.base_tracer import UserMessagePlugin
3130
from veadk.types import MediaMessage
3231
from veadk.utils.logger import get_logger
3332
from veadk.utils.misc import read_png_to_bytes
@@ -73,17 +72,6 @@ def __init__(
7372
else:
7473
self.long_term_memory = None
7574

76-
# process plugins
77-
try:
78-
# try to detect tracer
79-
_ = self.agent.tracers[0]
80-
if not plugins:
81-
plugins = [UserMessagePlugin(name="user_message_plugin")]
82-
else:
83-
plugins.append(UserMessagePlugin(name="user_message_plugin"))
84-
except Exception:
85-
logger.debug("Agent has no tracers, telemetry plugin not added.")
86-
8775
self.runner = ADKRunner(
8876
app_name=self.app_name,
8977
agent=self.agent,
@@ -201,7 +189,7 @@ def _print_trace_id(self):
201189
return
202190

203191
try:
204-
trace_id = self.agent.tracers[0].get_trace_id() # type: ignore
192+
trace_id = self.agent.tracers[0].trace_id # type: ignore
205193
logger.info(f"Trace id: {trace_id}")
206194
except Exception as e:
207195
logger.warning(f"Get tracer id failed as {e}")

veadk/tracing/base_tracer.py

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,69 +13,13 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16-
from typing import Optional
1716

18-
from google.adk.agents.invocation_context import InvocationContext
19-
from google.adk.plugins.base_plugin import BasePlugin
20-
from google.genai import types
21-
from opentelemetry import trace
2217

2318
from veadk.utils.logger import get_logger
2419

2520
logger = get_logger(__name__)
2621

2722

28-
class UserMessagePlugin(BasePlugin):
29-
def __init__(self, name: str):
30-
super().__init__(name)
31-
32-
async def on_user_message_callback(
33-
self,
34-
*,
35-
invocation_context: InvocationContext,
36-
user_message: types.Content,
37-
) -> Optional[types.Content]:
38-
"""Callback executed when a user message is received before an invocation starts.
39-
40-
This callback helps logging and modifying the user message before the
41-
runner starts the invocation.
42-
43-
Args:
44-
invocation_context: The context for the entire invocation.
45-
user_message: The message content input by user.
46-
47-
Returns:
48-
An optional `types.Content` to be returned to the ADK. Returning a
49-
value to replace the user message. Returning `None` to proceed
50-
normally.
51-
"""
52-
trace.get_tracer("gcp.vertex.agent")
53-
span = trace.get_current_span()
54-
55-
logger.debug(f"User message plugin works, catch {span}")
56-
span_name = getattr(span, "name", None)
57-
if span_name and span_name.startswith("invocation"):
58-
agent_name = invocation_context.agent.name
59-
invoke_branch = (
60-
invocation_context.branch if invocation_context.branch else agent_name
61-
)
62-
current_session = invocation_context.session
63-
64-
span.set_attribute("app.name", current_session.app_name)
65-
span.set_attribute("user.id", current_session.user_id)
66-
span.set_attribute("session.id", current_session.id)
67-
68-
span.set_attribute("agent.name", agent_name)
69-
span.set_attribute("invoke.branch", invoke_branch)
70-
span.set_attribute("gen_ai.system", "veadk")
71-
72-
logger.debug(
73-
f"Add attributes to {span_name}: app_name={current_session.app_name}, user_id={current_session.user_id}, session_id={current_session.id}, agent_name={agent_name}, invoke_branch={invoke_branch}"
74-
)
75-
76-
return None
77-
78-
7923
def replace_bytes_with_empty(data):
8024
"""
8125
Recursively traverse the data structure and replace all bytes types with empty strings.
Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,15 @@
11
from veadk.tracing.telemetry.attributes.extractors.common_attributes_extractors import (
2-
common_gen_ai_app_name,
3-
common_gen_ai_session_id,
4-
common_gen_ai_system,
5-
common_gen_ai_system_version,
6-
common_gen_ai_user_id,
2+
COMMON_ATTRIBUTES,
73
)
8-
from veadk.tracing.telemetry.attributes.extractors.llm_attributes_extrators import (
4+
from veadk.tracing.telemetry.attributes.extractors.llm_attributes_extractors import (
95
LLM_ATTRIBUTES,
106
)
7+
from veadk.tracing.telemetry.attributes.extractors.tool_attributes_extractors import (
8+
TOOL_ATTRIBUTES,
9+
)
1110

1211
ATTRIBUTES = {
13-
"common": {
14-
"gen_ai.system": common_gen_ai_system,
15-
"gen_ai.system_version": common_gen_ai_system_version,
16-
"gen_ai.app.name": common_gen_ai_app_name,
17-
"gen_ai.user.id": common_gen_ai_user_id,
18-
"gen_ai.session.id": common_gen_ai_session_id,
19-
},
12+
"common": COMMON_ATTRIBUTES,
2013
"llm": LLM_ATTRIBUTES,
21-
"tool": ...,
14+
"tool": TOOL_ATTRIBUTES,
2215
}
Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,33 @@
11
from veadk.version import VERSION
22

33

4-
def common_gen_ai_system() -> str:
4+
def common_gen_ai_system(**kwargs) -> str:
55
return "veadk"
66

77

8-
def common_gen_ai_system_version() -> str:
8+
def common_gen_ai_system_version(**kwargs) -> str:
99
return VERSION
1010

1111

1212
def common_gen_ai_app_name(**kwargs) -> str:
13-
return kwargs.get("app_name", "<unknown_app_name>")
13+
app_name = kwargs.get("app_name")
14+
return app_name or "<unknown_app_name>"
1415

1516

1617
def common_gen_ai_user_id(**kwargs) -> str:
17-
return kwargs.get("user_id", "")
18+
user_id = kwargs.get("user_id")
19+
return user_id or "<unknown_user_id>"
1820

1921

2022
def common_gen_ai_session_id(**kwargs) -> str:
21-
return kwargs.get("session_id", "<unknown_session_id>")
23+
session_id = kwargs.get("session_id")
24+
return session_id or "<unknown_session_id>"
25+
26+
27+
COMMON_ATTRIBUTES = {
28+
"gen_ai.system": common_gen_ai_system,
29+
"gen_ai.system.version": common_gen_ai_system_version,
30+
"gen_ai.app.name": common_gen_ai_app_name,
31+
"gen_ai.user.id": common_gen_ai_user_id,
32+
"gen_ai.session.id": common_gen_ai_session_id,
33+
}

veadk/tracing/telemetry/attributes/extractors/llm_attributes_extrators.py renamed to veadk/tracing/telemetry/attributes/extractors/llm_attributes_extractors.py

File renamed without changes.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from typing import Any
2+
3+
from attr import dataclass
4+
from google.adk.events import Event
5+
from google.adk.tools import BaseTool
6+
7+
8+
@dataclass
9+
class ToolAttributesParams:
10+
tool: BaseTool
11+
args: dict[str, Any]
12+
function_response_event: Event
13+
14+
15+
TOOL_ATTRIBUTES = {}

veadk/tracing/telemetry/exporters/cozeloop_exporter.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from typing import Any
16+
1517
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
1618
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1719
from pydantic import BaseModel, Field
@@ -41,18 +43,23 @@ class CozeloopExporterConfig(BaseModel):
4143
)
4244

4345

46+
class _CozeloopExporter(OTLPSpanExporter):
47+
pass
48+
49+
4450
class CozeloopExporter(BaseExporter):
4551
config: CozeloopExporterConfig = Field(default_factory=CozeloopExporterConfig)
4652

47-
def model_post_init(self) -> None:
53+
def model_post_init(self, context: Any) -> None:
4854
headers = {
4955
"cozeloop-workspace-id": self.config.space_id,
56+
"authorization": f"Bearer {self.config.token}",
5057
}
5158
self.headers |= headers
5259

5360
self._exporter = OTLPSpanExporter(
5461
endpoint=self.config.endpoint,
55-
headers=headers,
62+
headers=self.headers,
5663
timeout=10,
5764
)
5865

veadk/tracing/telemetry/exporters/inmemory_exporter.py

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,46 +23,30 @@
2323
logger = get_logger(__name__)
2424

2525

26+
in_memory_exporter_instance = None
27+
28+
2629
# ======== Adapted from Google ADK ========
2730
class _InMemoryExporter(export.SpanExporter):
28-
def __init__(self):
31+
def __init__(self) -> None:
2932
super().__init__()
3033
self._spans = []
31-
self.session_trace_dict = {}
3234
self.trace_id = ""
33-
self.prompt_tokens = []
34-
self.completion_tokens = []
35+
self.span_dict = {}
36+
self.session_trace_dict = {}
3537

3638
@override
3739
def export(self, spans: Sequence[ReadableSpan]) -> export.SpanExportResult:
3840
for span in spans:
3941
if span.context:
40-
trace_id = span.context.trace_id
41-
self.trace_id = trace_id
42+
self.trace_id = span.context.trace_id
43+
44+
span_id = span.context.span_id
45+
self.span_dict[span_id] = span
4246
else:
4347
logger.warning(
4448
f"Span context is missing, failed to get `trace_id`. span: {span}"
4549
)
46-
47-
if span.name == "call_llm":
48-
attributes = dict(span.attributes or {})
49-
prompt_token = attributes.get("gen_ai.usage.prompt_tokens", None)
50-
completion_token = attributes.get(
51-
"gen_ai.usage.completion_tokens", None
52-
)
53-
if prompt_token:
54-
self.prompt_tokens.append(prompt_token)
55-
if completion_token:
56-
self.completion_tokens.append(completion_token)
57-
58-
if span.name == "call_llm":
59-
attributes = dict(span.attributes or {})
60-
session_id = attributes.get("gcp.vertex.agent.session_id", None)
61-
if session_id:
62-
if session_id not in self.session_trace_dict:
63-
self.session_trace_dict[session_id] = [trace_id]
64-
else:
65-
self.session_trace_dict[session_id] += [trace_id]
6650
self._spans.extend(spans)
6751
return export.SpanExportResult.SUCCESS
6852

veadk/tracing/telemetry/opentelemetry_tracer.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import time
1919
from typing import Any
2020

21-
from openinference.instrumentation.google_adk import GoogleADKInstrumentor
21+
# from openinference.instrumentation.google_adk import GoogleADKInstrumentor
2222
from opentelemetry import trace as trace_api
2323
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
2424
from opentelemetry.sdk import trace as trace_sdk
@@ -79,7 +79,7 @@ def model_post_init(self, context: Any) -> None:
7979
# VeADK operates on global OpenTelemetry provider, return nothing
8080
self._init_global_tracer_provider()
8181

82-
GoogleADKInstrumentor().instrument()
82+
# GoogleADKInstrumentor().instrument()
8383

8484
def _init_global_tracer_provider(self) -> None:
8585
# set provider anyway, then get global provider
@@ -132,7 +132,7 @@ def trace_file_path(self) -> str:
132132
@property
133133
def trace_id(self) -> str:
134134
try:
135-
trace_id = hex(int(self._inmemory_exporter._exporter.trace_id))[2:]
135+
trace_id = hex(int(self._inmemory_exporter._exporter.trace_id))[2:] # type: ignore
136136
return trace_id
137137
except Exception as e:
138138
logger.error(f"Failed to get trace_id from InMemoryExporter: {e}")
@@ -151,23 +151,11 @@ def dump(
151151
)
152152
return ""
153153

154-
prompt_tokens = self._inmemory_exporter._real_exporter.prompt_tokens
155-
completion_tokens = self._inmemory_exporter._real_exporter.completion_tokens
156-
157-
# upload
158-
for meter_uploader in self._meter_uploaders:
159-
meter_uploader.record(
160-
prompt_tokens=prompt_tokens, completion_tokens=completion_tokens
161-
)
162-
# clear tokens after dump
163-
self._inmemory_exporter._real_exporter.completion_tokens = []
164-
self._inmemory_exporter._real_exporter.prompt_tokens = []
165-
166154
for processor in self._processors:
167155
time.sleep(0.05) # give some time for the exporter to upload spans
168156
processor.force_flush()
169157

170-
spans = self._inmemory_exporter._real_exporter.get_finished_spans(
158+
spans = self._inmemory_exporter._exporter.get_finished_spans( # type: ignore
171159
session_id=session_id
172160
)
173161
if not spans:
@@ -186,14 +174,11 @@ def dump(
186174
for s in spans
187175
]
188176

189-
trace_id = hex(int(self._inmemory_exporter._real_exporter.trace_id))[2:]
190-
self._trace_id = trace_id
191-
file_path = f"{path}/{self.name}_{user_id}_{session_id}_{trace_id}.json"
177+
file_path = f"{path}/{self.name}_{user_id}_{session_id}_{self.trace_id}.json"
192178
with open(file_path, "w") as f:
193179
json.dump(
194180
data, f, indent=4, ensure_ascii=False
195181
) # ensure_ascii=False to support Chinese characters
196-
197182
self._trace_file_path = file_path
198183

199184
for exporter in self.exporters:
@@ -203,6 +188,6 @@ def dump(
203188
f"OpenTelemetryTracer tracing done, trace id: {self._trace_id} (hex)"
204189
)
205190

206-
self._spans = spans
207191
logger.info(f"OpenTelemetryTracer dumps {len(spans)} spans to {file_path}")
192+
208193
return file_path

0 commit comments

Comments
 (0)