33
44from __future__ import annotations
55import json
6- import threading
76import time
87import traceback
98from typing import List , Dict , Union , Any , Optional
2322from cozeloop .integration .langchain .trace_model .runtime import RuntimeInfo
2423from cozeloop .integration .langchain .util import calc_token_usage , get_prompt_tag
2524
26- _trace_callback_client = None
25+ _trace_callback_client : Optional [Client ] = None
26+
2727
2828class LoopTracer :
2929 @classmethod
@@ -48,16 +48,15 @@ def __init__(self):
4848
4949 def on_llm_start (self , serialized : Dict [str , Any ], prompts : List [str ], ** kwargs : Any ) -> Any :
5050 span_tags = {}
51+ span_name = serialized .get ('name' , 'unknown' )
52+
53+ flow_span = self ._new_flow_span (span_name , 'model' , ** kwargs )
5154 try :
5255 span_tags ['input' ] = ModelTraceInput ([BaseMessage (type = '' , content = prompt ) for prompt in prompts ],
5356 kwargs .get ('invocation_params' , {})).to_json ()
54- span_name = serialized ['name' ]
5557 except Exception as e :
56- span_name = 'unknown'
57- span_tags ['internal_error' ] = repr (e )
58- span_tags ['internal_error_trace' ] = traceback .format_exc ()
58+ flow_span .set_error (e )
5959 finally :
60- flow_span = self ._new_flow_span (span_name , 'model' , ** kwargs )
6160 span_tags .update (_get_model_span_tags (** kwargs ))
6261 self ._set_span_tags (flow_span , span_tags )
6362 # Store some pre-aspect information.
@@ -67,15 +66,14 @@ def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs:
6766
6867 def on_chat_model_start (self , serialized : Dict [str , Any ], messages : List [List [BaseMessage ]], ** kwargs : Any ) -> Any :
6968 span_tags = {}
69+ span_name = serialized .get ('name' , 'unknown' )
70+
71+ flow_span = self ._new_flow_span (span_name , 'model' , ** kwargs )
7072 try :
7173 span_tags ['input' ] = ModelTraceInput (messages , kwargs .get ('invocation_params' , {})).to_json ()
72- span_name = serialized ['name' ]
7374 except Exception as e :
74- span_name = 'unknown'
75- span_tags ['internal_error' ] = repr (e )
76- span_tags ['internal_error_trace' ] = traceback .format_exc ()
75+ flow_span .set_error (e )
7776 finally :
78- flow_span = self ._new_flow_span (span_name , 'model' , ** kwargs )
7977 span_tags .update (_get_model_span_tags (** kwargs ))
8078 self ._set_span_tags (flow_span , span_tags )
8179 # Store some pre-aspect information.
@@ -105,13 +103,8 @@ def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
105103 if run_info is not None and run_info .model_meta is not None :
106104 model_name = run_info .model_meta .model_name
107105 input_messages = run_info .model_meta .message
108- token_usage = {
109- 'input_tokens' : calc_token_usage (input_messages , model_name ),
110- 'output_tokens' : calc_token_usage (response , model_name ),
111- 'tokens' : 0
112- }
113- token_usage ['tokens' ] = token_usage ['input_tokens' ] + token_usage ['output_tokens' ]
114- self ._set_span_tags (flow_span , token_usage , need_convert_tag_value = False )
106+ flow_span .set_input_tokens (calc_token_usage (input_messages , model_name ))
107+ flow_span .set_output_tokens (calc_token_usage (response , model_name ))
115108 # finish flow_span
116109 flow_span .finish ()
117110
@@ -139,24 +132,17 @@ def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: A
139132 if flow_span is None :
140133 span_name = '_Exception' if isinstance (error , Exception ) else '_KeyboardInterrupt'
141134 flow_span = self ._new_flow_span (span_name , 'chain_error' , ** kwargs )
142- flow_span .set_tags ({ ' error' : repr ( error )} )
135+ flow_span .set_error ( error )
143136 flow_span .set_tags ({'error_trace' : traceback .format_exc ()})
144- flow_span .set_tags ({'_status_code' : - 1 })
145137 flow_span .finish ()
146138
147139 def on_tool_start (
148140 self , serialized : Dict [str , Any ], input_str : str , ** kwargs : Any
149141 ) -> Any :
150142 span_tags = {'input' : input_str , ** serialized }
151- try :
152- span_name = serialized ['name' ]
153- except Exception as e :
154- span_name = 'unknown'
155- span_tags ['internal_error' ] = repr (e )
156- span_tags ['internal_error_trace' ] = traceback .format_exc ()
157- finally :
158- flow_span = self ._new_flow_span (span_name , 'tool' , ** kwargs )
159- self ._set_span_tags (flow_span , span_tags )
143+ span_name = serialized .get ('name' , 'unknown' )
144+ flow_span = self ._new_flow_span (span_name , 'tool' , ** kwargs )
145+ self ._set_span_tags (flow_span , span_tags )
160146
161147 def on_tool_end (self , output : str , ** kwargs : Any ) -> Any :
162148 flow_span = self ._get_flow_span (** kwargs )
@@ -170,9 +156,8 @@ def on_tool_error(
170156 if flow_span is None :
171157 span_name = '_Exception' if isinstance (error , Exception ) else '_KeyboardInterrupt'
172158 flow_span = self ._new_flow_span (span_name , 'tool_error' , ** kwargs )
173- flow_span .set_tags ({ ' error' : repr ( error )} )
159+ flow_span .set_error ( error )
174160 flow_span .set_tags ({'error_trace' : traceback .format_exc ()})
175- flow_span .set_tags ({'_status_code' : - 1 })
176161 flow_span .finish ()
177162
178163 def on_text (self , text : str , ** kwargs : Any ) -> Any :
@@ -241,9 +226,7 @@ def _new_flow_span(self, span_name: str, span_type: str, **kwargs: Any) -> Span:
241226 run_id = str (kwargs ['run_id' ])
242227 self .run_map [run_id ] = Run (run_id , flow_span , span_type )
243228 # set default tags
244- # flow_span.set_tags({'space_id': self._space_id})
245- flow_span .set_tags ({'span_type' : span_type })
246- flow_span .set_tags ({'runtime' : RuntimeInfo ().to_json ()})
229+ flow_span .set_runtime (RuntimeInfo ())
247230 return flow_span
248231
249232 def _get_flow_span (self , ** kwargs : Any ) -> Span :
@@ -252,13 +235,6 @@ def _get_flow_span(self, **kwargs: Any) -> Span:
252235 return self .run_map [run_id ].span
253236 return None
254237
255- def _set_internal_error_span (self , error : Exception , ** kwargs : Any ) -> None :
256- flow_span = self ._new_flow_span ('internal_error' , 'error' , ** kwargs )
257- flow_span .set_tags ({'internal_error' : error })
258- flow_span .set_tags ({'internal_error_trace' : traceback .format_exc ()})
259- flow_span .set_tags ({'_status_code' : - 1 })
260- flow_span .finish ()
261-
262238 def _set_span_tags (self , flow_span : Span , tags : Dict [str , Any ], need_convert_tag_value = True ) -> None :
263239 for key , value in tags .items ():
264240 report_value = value
0 commit comments