Skip to content

Commit 4658307

Browse files
author
Andrei Bratu
committed
PR feedback
1 parent 22f6fd3 commit 4658307

7 files changed

Lines changed: 156 additions & 153 deletions

File tree

src/otel/constants.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ export const HUMANLOOP_LOG_KEY = "humanloop.log";
55
export const HUMANLOOP_FILE_TYPE_KEY = "humanloop.file_type";
66
export const HUMANLOOP_PATH_KEY = "humanloop.file.path";
77
export const HUMANLOOP_META_FUNCTION_NAME = "humanloop.meta.function_name";
8-
export const HUMANLOOP_PARENT_SPAN_CTX_KEY = "humanloop.context.parentSpanId";
9-
export const HUMANLOOP_TRACE_FLOW_CTX_KEY = "humanloop.context.traceFlow";
108
export const HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites";
9+
10+
export const HUMANLOOP_SPAN_PREFIX = "humanloop.";
11+
export const HUMANLOOP_FLOW_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}flow`;
12+
export const HUMANLOOP_PROMPT_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}prompt`;
13+
export const HUMANLOOP_TOOL_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}tool`;

src/otel/exporter.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ export class HumanloopSpanExporter implements SpanExporter {
3131
private shutdownFlag: boolean;
3232
private readonly uploadPromises: Promise<void>[];
3333
private readonly exportedSpans: ReadableSpan[];
34+
// List of spans that must be uploaded before completing the Flow log
35+
// This maps [flow log span ID] -> [set of child span IDs]
3436
private readonly prerequisites: Map<string, Set<string>>;
3537

3638
constructor(client: HumanloopClient) {
@@ -72,7 +74,20 @@ export class HumanloopSpanExporter implements SpanExporter {
7274
await this.shutdown();
7375
}
7476

75-
private completeFlowLog(spanId: string) {
77+
/**
78+
* Mark a span as uploaded to the Humanloop.
79+
*
80+
* A Log might be contained inside a Flow trace, which must be marked as complete
81+
* when all its children are uploaded. Each Flow Log span contains a
82+
* 'humanloop.flow.prerequisites' attribute, which is a list of all spans that must
83+
* be uploaded before the Flow Log is marked as complete.
84+
*
85+
* This method finds the trace the Span belongs to and removes the Span from the list.
86+
* Once all prerequisites are uploaded, the method marks the Flow Log as complete.
87+
*
88+
* @param spanId - The ID of the span that has been uploaded.
89+
*/
90+
private notifySpanUploaded(spanId: string) {
7691
for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) {
7792
if (flowChildrenSpanIds.has(spanId)) {
7893
flowChildrenSpanIds.delete(spanId);
@@ -146,7 +161,7 @@ export class HumanloopSpanExporter implements SpanExporter {
146161
} catch (error) {
147162
console.error(`Error exporting prompt: ${error}`);
148163
}
149-
this.completeFlowLog(span.spanContext().spanId);
164+
this.notifySpanUploaded(span.spanContext().spanId);
150165
}
151166

152167
private async exportTool(span: ReadableSpan): Promise<void> {
@@ -175,7 +190,7 @@ export class HumanloopSpanExporter implements SpanExporter {
175190
} catch (error) {
176191
console.error(`Error exporting tool: ${error}`);
177192
}
178-
this.completeFlowLog(span.spanContext().spanId);
193+
this.notifySpanUploaded(span.spanContext().spanId);
179194
}
180195

181196
private async exportFlow(span: ReadableSpan): Promise<void> {
@@ -218,6 +233,6 @@ export class HumanloopSpanExporter implements SpanExporter {
218233
} catch (error) {
219234
console.error("Error exporting flow: ", error, span.spanContext().spanId);
220235
}
221-
this.completeFlowLog(span.spanContext().spanId);
236+
this.notifySpanUploaded(span.spanContext().spanId);
222237
}
223238
}

src/otel/helpers.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { AttributeValue, SpanKind } from "@opentelemetry/api";
22
import { ReadableSpan } from "@opentelemetry/sdk-trace-base";
3-
import { v4 as uuidv4 } from "uuid";
43

54
// Constants for Humanloop attributes
6-
import { HUMANLOOP_FILE_TYPE_KEY } from "./constants";
5+
import { HUMANLOOP_SPAN_PREFIX } from "./constants";
76

87
export type NestedDict = { [key: string]: NestedDict | AttributeValue };
98
export type NestedList = Array<NestedDict | AttributeValue>;
@@ -191,7 +190,7 @@ export function isLLMProviderCall(span: ReadableSpan): boolean {
191190
* @returns True if the span was created by the Humanloop SDK, false otherwise
192191
*/
193192
export function isHumanloopSpan(span: ReadableSpan): boolean {
194-
return span.name.startsWith("humanloop.");
193+
return span.name.startsWith(HUMANLOOP_SPAN_PREFIX);
195194
}
196195

197196
/**

src/otel/processor.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { PromptKernelRequest } from "../api/types/PromptKernelRequest";
1313
import {
1414
HUMANLOOP_FILE_KEY,
1515
HUMANLOOP_FILE_TYPE_KEY,
16+
HUMANLOOP_FLOW_SPAN_NAME,
1617
HUMANLOOP_LOG_KEY,
1718
HUMANLOOP_META_FUNCTION_NAME,
1819
} from "./constants";
@@ -52,7 +53,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
5253
onStart(span: Span, _: Context): void {
5354
const spanId = span.spanContext().spanId;
5455
const parentSpanId = span.parentSpanId;
55-
if (span.name === "humanloop.flow") {
56+
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
5657
this.prerequisites.set(spanId, []);
5758
}
5859
if (parentSpanId !== undefined && isHumanloopSpan(span)) {
@@ -104,7 +105,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
104105
}).then((_) => {
105106
// All instrumentor spans have arrived, we can process the
106107
// Humanloop parent span owning them
107-
if (span.name === "humanloop.flow") {
108+
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
108109
// If the span if a Flow Log, add attribute with all span IDs it
109110
// needs to wait before completion
110111
writeToOpenTelemetrySpan(

src/utilities/flow.ts

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
import { context, createContextKey } from "@opentelemetry/api";
21
import { ReadableSpan, Tracer } from "@opentelemetry/sdk-trace-node";
32

43
import { FlowKernelRequest } from "../api/types/FlowKernelRequest";
54
import {
65
HUMANLOOP_FILE_TYPE_KEY,
6+
HUMANLOOP_FLOW_SPAN_NAME,
77
HUMANLOOP_LOG_KEY,
88
HUMANLOOP_META_FUNCTION_NAME,
9-
HUMANLOOP_PARENT_SPAN_CTX_KEY,
109
HUMANLOOP_PATH_KEY,
11-
HUMANLOOP_TRACE_FLOW_CTX_KEY,
1210
NestedDict,
1311
jsonifyIfNotString,
1412
writeToOpenTelemetrySpan,
@@ -35,55 +33,53 @@ export function flowUtilityFactory<I, M, O>(
3533
);
3634
}
3735

38-
const parentSpanContextKey = createContextKey(HUMANLOOP_PARENT_SPAN_CTX_KEY);
39-
const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY);
4036
// @ts-ignore
41-
return opentelemetryTracer.startActiveSpan("humanloop.flow", async (span) => {
42-
const ctx = context.active();
43-
const spanId = span.spanContext().spanId;
37+
return opentelemetryTracer.startActiveSpan(
38+
HUMANLOOP_FLOW_SPAN_NAME,
39+
async (span) => {
40+
// Add span attributes
41+
span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name);
42+
span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "flow");
43+
span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name);
4444

45-
// Add span attributes
46-
span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name);
47-
span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "flow");
48-
span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name);
45+
if (version) {
46+
writeToOpenTelemetrySpan(
47+
span as unknown as ReadableSpan,
48+
version as unknown as NestedDict,
49+
"humanloop.file.flow",
50+
);
51+
}
4952

50-
if (version) {
51-
writeToOpenTelemetrySpan(
52-
span as unknown as ReadableSpan,
53-
version as unknown as NestedDict,
54-
"humanloop.file.flow",
55-
);
56-
}
57-
58-
let output: O | null;
59-
let error: string | null = null;
60-
try {
61-
output = await func(inputs, messages);
62-
} catch (err: any) {
63-
console.error(`Error calling ${func.name}:`, err);
64-
output = null;
65-
error = err.message || String(err);
66-
}
53+
let output: O | null;
54+
let error: string | null = null;
55+
try {
56+
output = await func(inputs, messages);
57+
} catch (err: any) {
58+
console.error(`Error calling ${func.name}:`, err);
59+
output = null;
60+
error = err.message || String(err);
61+
}
6762

68-
const outputStringified = jsonifyIfNotString(func, output);
63+
const outputStringified = jsonifyIfNotString(func, output);
6964

70-
const flowLog = {
71-
output: outputStringified,
72-
inputs: inputs,
73-
messages: messages,
74-
error,
75-
};
65+
const flowLog = {
66+
output: outputStringified,
67+
inputs: inputs,
68+
messages: messages,
69+
error,
70+
};
7671

77-
writeToOpenTelemetrySpan(
78-
span as unknown as ReadableSpan,
79-
// @ts-ignore
80-
flowLog,
81-
HUMANLOOP_LOG_KEY,
82-
);
72+
writeToOpenTelemetrySpan(
73+
span as unknown as ReadableSpan,
74+
// @ts-ignore
75+
flowLog,
76+
HUMANLOOP_LOG_KEY,
77+
);
8378

84-
span.end();
85-
return output;
86-
});
79+
span.end();
80+
return output;
81+
},
82+
);
8783
};
8884

8985
// @ts-ignore

src/utilities/prompt.ts

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { context, createContextKey } from "@opentelemetry/api";
21
import { ReadableSpan, Tracer } from "@opentelemetry/sdk-trace-node";
32

43
import { PromptKernelRequest } from "../api/types/PromptKernelRequest";
@@ -7,9 +6,8 @@ import {
76
HUMANLOOP_FILE_TYPE_KEY,
87
HUMANLOOP_LOG_KEY,
98
HUMANLOOP_META_FUNCTION_NAME,
10-
HUMANLOOP_PARENT_SPAN_CTX_KEY,
119
HUMANLOOP_PATH_KEY,
12-
HUMANLOOP_TRACE_FLOW_CTX_KEY,
10+
HUMANLOOP_PROMPT_SPAN_NAME,
1311
NestedDict,
1412
jsonifyIfNotString,
1513
writeToOpenTelemetrySpan,
@@ -56,57 +54,53 @@ export function promptUtilityFactory<I, M, O>(
5654
.reduce((obj, [key, value]) => ({ ...obj, [key]: value }), {});
5755
}
5856

59-
const parentSpanContextKey = createContextKey(HUMANLOOP_PARENT_SPAN_CTX_KEY);
60-
const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY);
61-
6257
// @ts-ignore
63-
return opentelemetryTracer.startActiveSpan("humanloop.prompt", async (span) => {
64-
const ctx = context.active();
65-
const spanId = span.spanContext().spanId;
58+
return opentelemetryTracer.startActiveSpan(
59+
HUMANLOOP_PROMPT_SPAN_NAME,
60+
async (span) => {
61+
// Add span attributes
62+
span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name);
63+
span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "prompt");
64+
span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name);
65+
66+
if (version) {
67+
writeToOpenTelemetrySpan(
68+
span as unknown as ReadableSpan,
69+
{
70+
...version,
71+
} as unknown as NestedDict,
72+
"humanloop.file.prompt",
73+
);
74+
}
75+
76+
// Execute the wrapped function in a child context
77+
let output: O | null;
78+
let error: string | null = null;
79+
try {
80+
output = await func(inputs, messages);
81+
} catch (err: any) {
82+
console.error(`Error calling ${func.name}:`, err);
83+
output = null;
84+
error = err.message || String(err);
85+
}
6686

67-
// Add span attributes
68-
span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name);
69-
span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "prompt");
70-
span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name);
87+
const promptLog = {
88+
output: jsonifyIfNotString(func, output),
89+
error,
90+
inputs: inputs,
91+
messages: messages,
92+
};
7193

72-
if (version) {
7394
writeToOpenTelemetrySpan(
7495
span as unknown as ReadableSpan,
75-
{
76-
...version,
77-
} as unknown as NestedDict,
78-
"humanloop.file.prompt",
96+
// @ts-ignore
97+
promptLog,
98+
HUMANLOOP_LOG_KEY,
7999
);
80-
}
81-
82-
// Execute the wrapped function in a child context
83-
let output: O | null;
84-
let error: string | null = null;
85-
try {
86-
output = await func(inputs, messages);
87-
} catch (err: any) {
88-
console.error(`Error calling ${func.name}:`, err);
89-
// @ts-ignore
90-
output = null;
91-
error = err.message || String(err);
92-
}
93-
94-
const promptLog = {
95-
output: jsonifyIfNotString(func, output),
96-
error,
97-
inputs: inputs,
98-
messages: messages,
99-
};
100-
101-
writeToOpenTelemetrySpan(
102-
span as unknown as ReadableSpan,
103-
// @ts-ignore
104-
promptLog,
105-
HUMANLOOP_LOG_KEY,
106-
);
107100

108-
span.end();
109-
return output;
110-
});
101+
span.end();
102+
return output;
103+
},
104+
);
111105
};
112106
}

0 commit comments

Comments
 (0)