Skip to content

Commit 22f6fd3

Browse files
author
Andrei Bratu
committed
Fix flow complete bug
1 parent d3dbafa commit 22f6fd3

8 files changed

Lines changed: 137 additions & 183 deletions

File tree

package.json

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,56 +12,56 @@
1212
"test": "jest"
1313
},
1414
"dependencies": {
15-
"url-join": "4.0.1",
16-
"form-data": "^4.0.0",
17-
"formdata-node": "^6.0.3",
18-
"node-fetch": "2.7.0",
19-
"qs": "6.11.2",
20-
"readable-stream": "^4.5.2",
21-
"form-data-encoder": "^4.0.2",
2215
"@opentelemetry/api": "1.9.0",
2316
"@opentelemetry/auto-instrumentations-node": "0.53.0",
2417
"@opentelemetry/sdk-metrics": "1.28.0",
2518
"@opentelemetry/sdk-node": "0.55.0",
2619
"@opentelemetry/sdk-trace-node": "1.28.0",
20+
"@traceloop/ai-semantic-conventions": "0.11.6",
2721
"@traceloop/instrumentation-anthropic": "0.11.1",
2822
"@traceloop/instrumentation-cohere": "0.11.1",
2923
"@traceloop/instrumentation-openai": "0.11.3",
30-
"@traceloop/ai-semantic-conventions": "0.11.6",
31-
"uuid": "11.0.3",
32-
"nanoid": "5.0.9",
3324
"cli-progress": "^3.12.0",
25+
"form-data": "^4.0.0",
26+
"form-data-encoder": "^4.0.2",
27+
"formdata-node": "^6.0.3",
3428
"lodash": "4.17.21",
29+
"nanoid": "5.0.9",
30+
"node-fetch": "2.7.0",
3531
"p-map": "7.0.3",
36-
"stable-hash": "0.0.4"
32+
"qs": "6.11.2",
33+
"readable-stream": "^4.5.2",
34+
"stable-hash": "0.0.4",
35+
"url-join": "4.0.1",
36+
"uuid": "11.0.3"
3737
},
3838
"devDependencies": {
39-
"@types/url-join": "4.0.1",
40-
"@types/qs": "6.9.8",
39+
"@anthropic-ai/sdk": "^0.32.1",
40+
"@babel/core": "^7.26.0",
41+
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
42+
"@babel/preset-env": "^7.26.0",
43+
"@trivago/prettier-plugin-sort-imports": "4.3.0",
44+
"@types/cli-progress": "^3.11.6",
45+
"@types/jest": "29.5.5",
46+
"@types/lodash": "4.14.74",
47+
"@types/node": "17.0.33",
4148
"@types/node-fetch": "2.6.9",
49+
"@types/qs": "6.9.8",
4250
"@types/readable-stream": "^4.0.15",
51+
"@types/url-join": "4.0.1",
52+
"babel-jest": "^29.7.0",
53+
"cohere-ai": "^7.15.0",
54+
"dotenv": "^16.4.6",
4355
"fetch-mock-jest": "^1.5.1",
44-
"webpack": "^5.94.0",
45-
"ts-loader": "^9.3.1",
4656
"jest": "29.7.0",
47-
"@types/jest": "29.5.5",
48-
"ts-jest": "29.1.1",
4957
"jest-environment-jsdom": "29.7.0",
50-
"@types/node": "17.0.33",
58+
"jsonschema": "^1.4.1",
59+
"openai": "^4.74.0",
5160
"prettier": "^3.4.2",
61+
"ts-jest": "29.1.1",
62+
"ts-loader": "^9.3.1",
5263
"typescript": "4.6.4",
53-
"openai": "^4.74.0",
54-
"@anthropic-ai/sdk": "^0.32.1",
55-
"cohere-ai": "^7.15.0",
56-
"dotenv": "^16.4.6",
57-
"jsonschema": "^1.4.1",
58-
"@types/cli-progress": "^3.11.6",
59-
"babel-jest": "^29.7.0",
60-
"@babel/core": "^7.26.0",
61-
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
62-
"@babel/preset-env": "^7.26.0",
63-
"@types/lodash": "4.14.74",
64-
"@trivago/prettier-plugin-sort-imports": "4.3.0"
64+
"webpack": "^5.94.0"
6565
},
6666
"browser": {
6767
"fs": false,

src/otel/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export const HUMANLOOP_PATH_KEY = "humanloop.file.path";
77
export const HUMANLOOP_META_FUNCTION_NAME = "humanloop.meta.function_name";
88
export const HUMANLOOP_PARENT_SPAN_CTX_KEY = "humanloop.context.parentSpanId";
99
export const HUMANLOOP_TRACE_FLOW_CTX_KEY = "humanloop.context.traceFlow";
10+
export const HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites";

src/otel/exporter.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { HumanloopClient } from "../humanloop.client";
66
import {
77
HUMANLOOP_FILE_KEY,
88
HUMANLOOP_FILE_TYPE_KEY,
9+
HUMANLOOP_FLOW_PREREQUISITES_KEY,
910
HUMANLOOP_LOG_KEY,
1011
HUMANLOOP_PATH_KEY,
1112
} from "./constants";
@@ -30,13 +31,15 @@ export class HumanloopSpanExporter implements SpanExporter {
3031
private shutdownFlag: boolean;
3132
private readonly uploadPromises: Promise<void>[];
3233
private readonly exportedSpans: ReadableSpan[];
34+
private readonly prerequisites: Map<string, Set<string>>;
3335

3436
constructor(client: HumanloopClient) {
3537
this.client = client;
3638
this.spanIdToUploadedLogId = new Map();
3739
this.shutdownFlag = false;
3840
this.uploadPromises = [];
3941
this.exportedSpans = [];
42+
this.prerequisites = new Map();
4043
}
4144

4245
export(spans: ReadableSpan[]): ExportResult {
@@ -69,6 +72,19 @@ export class HumanloopSpanExporter implements SpanExporter {
6972
await this.shutdown();
7073
}
7174

75+
private completeFlowLog(spanId: string) {
76+
for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) {
77+
if (flowChildrenSpanIds.has(spanId)) {
78+
flowChildrenSpanIds.delete(spanId);
79+
if (flowChildrenSpanIds.size === 0) {
80+
const flowLogId = this.spanIdToUploadedLogId.get(flowLogSpanId)!;
81+
this.client.flows.updateLog(flowLogId, { traceStatus: "complete" });
82+
}
83+
break;
84+
}
85+
}
86+
}
87+
7288
private async exportSpanDispatch(span: ReadableSpan): Promise<void> {
7389
const fileType = span.attributes[HUMANLOOP_FILE_TYPE_KEY];
7490
const parentSpanId = span.parentSpanId;
@@ -130,6 +146,7 @@ export class HumanloopSpanExporter implements SpanExporter {
130146
} catch (error) {
131147
console.error(`Error exporting prompt: ${error}`);
132148
}
149+
this.completeFlowLog(span.spanContext().spanId);
133150
}
134151

135152
private async exportTool(span: ReadableSpan): Promise<void> {
@@ -158,6 +175,7 @@ export class HumanloopSpanExporter implements SpanExporter {
158175
} catch (error) {
159176
console.error(`Error exporting tool: ${error}`);
160177
}
178+
this.completeFlowLog(span.spanContext().spanId);
161179
}
162180

163181
private async exportFlow(span: ReadableSpan): Promise<void> {
@@ -168,6 +186,18 @@ export class HumanloopSpanExporter implements SpanExporter {
168186
logObject.startTime = hrTimeToDate(span.startTime);
169187
logObject.endTime = hrTimeToDate(span.endTime);
170188
logObject.createdAt = hrTimeToDate(span.endTime);
189+
// Spans that must be uploaded before the Flow Span is completed
190+
let prerequisites: string[] | undefined = undefined;
191+
try {
192+
prerequisites = readFromOpenTelemetrySpan(
193+
span,
194+
HUMANLOOP_FLOW_PREREQUISITES_KEY,
195+
) as unknown as string[];
196+
} catch (error) {
197+
prerequisites = [];
198+
}
199+
200+
this.prerequisites.set(span.spanContext().spanId, new Set(prerequisites));
171201

172202
const spanParentId = span.parentSpanId;
173203
const traceParentId = spanParentId
@@ -188,5 +218,6 @@ export class HumanloopSpanExporter implements SpanExporter {
188218
} catch (error) {
189219
console.error("Error exporting flow: ", error, span.spanContext().spanId);
190220
}
221+
this.completeFlowLog(span.spanContext().spanId);
191222
}
192223
}

src/otel/helpers.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ function _listToOtelFormat(lst: NestedList): NestedDict {
3434
*/
3535
export function writeToOpenTelemetrySpan(
3636
span: ReadableSpan,
37-
value: NestedDict | NestedList | AttributeValue,
37+
value: NestedDict | NestedList | AttributeValue | any[],
3838
key: string,
3939
): void {
4040
let toWriteCopy: NestedDict;
@@ -191,16 +191,7 @@ export function isLLMProviderCall(span: ReadableSpan): boolean {
191191
* @returns True if the span was created by the Humanloop SDK, false otherwise
192192
*/
193193
export function isHumanloopSpan(span: ReadableSpan): boolean {
194-
return span.attributes[HUMANLOOP_FILE_TYPE_KEY] !== undefined;
195-
}
196-
197-
/**
198-
* Generates a unique span ID.
199-
*
200-
* @returns A UUID string
201-
*/
202-
export function generateSpanId(): string {
203-
return uuidv4();
194+
return span.name.startsWith("humanloop.");
204195
}
205196

206197
/**

src/otel/processor.ts

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,37 @@ interface CompletableSpan {
3636
export class HumanloopSpanProcessor implements SpanProcessor {
3737
private spanExporter: SpanExporter;
3838
private children: Map<string, CompletableSpan[]>;
39+
// List of all span IDs that are contained in a Flow trace
40+
// They are passed to the Exporter as a span attribute
41+
// so the Exporter knows when to complete a trace
42+
private prerequisites: Map<string, string[]>;
3943

4044
constructor(exporter: SpanExporter) {
4145
this.spanExporter = exporter;
4246
this.children = new Map();
47+
this.prerequisites = new Map();
4348
}
4449

4550
async forceFlush(): Promise<void> {}
4651

4752
onStart(span: Span, _: Context): void {
53+
const spanId = span.spanContext().spanId;
54+
const parentSpanId = span.parentSpanId;
55+
if (span.name === "humanloop.flow") {
56+
this.prerequisites.set(spanId, []);
57+
}
58+
if (parentSpanId !== undefined && isHumanloopSpan(span)) {
59+
for (const [traceHead, allTraceNodes] of this.prerequisites) {
60+
if (
61+
parentSpanId === traceHead ||
62+
allTraceNodes.includes(parentSpanId)
63+
) {
64+
allTraceNodes.push(spanId);
65+
this.prerequisites.set(traceHead, allTraceNodes);
66+
break;
67+
}
68+
}
69+
}
4870
// Handle stream case: when Prompt instrumented function calls a provider with streaming: true
4971
// The instrumentor span will end only when the ChunksResponse is consumed, which can happen
5072
// after the span created by the Prompt utility finishes. To handle this, we register all instrumentor
@@ -66,6 +88,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
6688
*/
6789
onEnd(span: ReadableSpan): void {
6890
if (isHumanloopSpan(span)) {
91+
// Wait for children to complete asynchronously
6992
new Promise<void>((resolve) => {
7093
const checkChildrenSpans = () => {
7194
const childrenSpans = this.children.get(span.spanContext().spanId);
@@ -79,15 +102,28 @@ export class HumanloopSpanProcessor implements SpanProcessor {
79102
};
80103
checkChildrenSpans();
81104
}).then((_) => {
82-
// All children/ instrumentor spans have arrived, we can process the
105+
// All instrumentor spans have arrived, we can process the
83106
// Humanloop parent span owning them
107+
if (span.name === "humanloop.flow") {
108+
// If the span if a Flow Log, add attribute with all span IDs it
109+
// needs to wait before completion
110+
writeToOpenTelemetrySpan(
111+
span,
112+
this.prerequisites.get(span.spanContext().spanId) || [],
113+
HUMANLOOP_LOG_KEY,
114+
);
115+
this.prerequisites.delete(span.spanContext().spanId);
116+
}
117+
84118
this.processSpanDispatch(
85119
span,
86120
this.children.get(span.spanContext().spanId) || [],
87121
);
122+
88123
// Release references
89124
this.children.delete(span.spanContext().spanId);
90-
// Export the Humanloop span
125+
126+
// Pass Humanloop span to Exporter
91127
this.spanExporter.export([span], (result: ExportResult) => {
92128
if (result.code !== ExportResultCode.SUCCESS) {
93129
console.error("Failed to export span:", result.error);
@@ -182,7 +218,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
182218
// Placeholder for processing other file types
183219
break;
184220
default:
185-
console.error("Unknown Humanloop File Span", span);
221+
console.error("Unknown Humanloop File span", span);
186222
}
187223
}
188224

src/utilities/flow.ts

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import {
1010
HUMANLOOP_PATH_KEY,
1111
HUMANLOOP_TRACE_FLOW_CTX_KEY,
1212
NestedDict,
13-
generateSpanId,
1413
jsonifyIfNotString,
1514
writeToOpenTelemetrySpan,
1615
} from "../otel";
@@ -39,30 +38,9 @@ export function flowUtilityFactory<I, M, O>(
3938
const parentSpanContextKey = createContextKey(HUMANLOOP_PARENT_SPAN_CTX_KEY);
4039
const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY);
4140
// @ts-ignore
42-
return opentelemetryTracer.startActiveSpan(generateSpanId(), async (span) => {
41+
return opentelemetryTracer.startActiveSpan("humanloop.flow", async (span) => {
4342
const ctx = context.active();
4443
const spanId = span.spanContext().spanId;
45-
const parentSpanId = ctx.getValue(parentSpanContextKey) as
46-
| string
47-
| undefined;
48-
const parentFlowMetadata = ctx.getValue(flowMetadataKey) as {
49-
traceId: string;
50-
isFlowLog: boolean;
51-
traceParentId: string;
52-
} | null;
53-
// Handle trace flow context
54-
const flowMetadata =
55-
parentSpanId && parentFlowMetadata
56-
? {
57-
traceId: spanId,
58-
isFlowLog: true,
59-
traceParentId: parentSpanId,
60-
}
61-
: {
62-
traceId: spanId,
63-
traceParentId: null,
64-
isFlowLog: true,
65-
};
6644

6745
// Add span attributes
6846
span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name);
@@ -77,26 +55,15 @@ export function flowUtilityFactory<I, M, O>(
7755
);
7856
}
7957

80-
const { output, error } = await context.with(
81-
ctx
82-
.setValue(parentSpanContextKey, spanId)
83-
.setValue(flowMetadataKey, flowMetadata),
84-
async () => {
85-
let output: O | null;
86-
let error: string | null = null;
87-
try {
88-
output = await func(inputs, messages);
89-
} catch (err: any) {
90-
console.error(`Error calling ${func.name}:`, err);
91-
output = null;
92-
error = err.message || String(err);
93-
}
94-
return {
95-
output,
96-
error,
97-
};
98-
},
99-
);
58+
let output: O | null;
59+
let error: string | null = null;
60+
try {
61+
output = await func(inputs, messages);
62+
} catch (err: any) {
63+
console.error(`Error calling ${func.name}:`, err);
64+
output = null;
65+
error = err.message || String(err);
66+
}
10067

10168
const outputStringified = jsonifyIfNotString(func, output);
10269

0 commit comments

Comments
 (0)