diff --git a/example-apps/collector/.env b/example-apps/collector/.env index f94dabfe58..00dccf7bf8 100644 --- a/example-apps/collector/.env +++ b/example-apps/collector/.env @@ -1,6 +1,6 @@ # All default settings defined here can be overridden by environment variables. -# MODE=npm +MODE=local APP_PORT=2807 SENSOR_ENABLED=true TRACING_ENABLED=true diff --git a/example-apps/collector/src/index.js b/example-apps/collector/src/index.js index f2f27aa497..4aec420fd6 100644 --- a/example-apps/collector/src/index.js +++ b/example-apps/collector/src/index.js @@ -19,6 +19,9 @@ if (config.mode === 'npm') { packageToRequire = '@instana/collector'; } +process.env.INSTANA_METRICS_TRANSMISSION_DELAY = 5000; +process.env.INSTANA_OTLP_FORMAT = 'true'; + if (config.collectorEnabled) { console.log(`enabling @instana/collector (requiring ${packageToRequire})`); require(packageToRequire)({ diff --git a/packages/collector/src/agentConnection.js b/packages/collector/src/agentConnection.js index 7ca47d69c5..63f9fa2669 100644 --- a/packages/collector/src/agentConnection.js +++ b/packages/collector/src/agentConnection.js @@ -10,7 +10,7 @@ const pathUtil = require('path'); const circularReferenceRemover = require('./util/removeCircular'); const agentOpts = require('./agent/opts'); const cmdline = require('./cmdline'); - +const otlpTransformer = require('@instana/core/src/tracing/otlpTransformer'); /** @typedef {import('@instana/core/src/core').InstanaBaseSpan} InstanaBaseSpan */ /** @type {import('@instana/core/src/core').GenericLogger} */ @@ -307,20 +307,55 @@ function checkWhetherResponseForPathIsOkay(path, cb) { exports.sendMetrics = function sendMetrics(data, cb) { cb = util.atMostOnce('callback for sendMetrics', cb); - sendData(`/com.instana.plugin.nodejs.${pidStore.pid}`, data, (err, body) => { + // Zeige nur die ersten 2 Keys für Debugging + const dataKeys = Object.keys(data); + const firstTwoKeys = {}; + for (let i = 0; i < Math.min(2, dataKeys.length); i++) { + firstTwoKeys[dataKeys[i]] = data[dataKeys[i]]; + } + + // logger.debug(`sendMetrics called with data (first 2 keys): ${JSON.stringify(firstTwoKeys)}`); + + // Transform Instana metrics to OTLP format + const otlpMetrics = otlpTransformer.transformMetrics(data); + + // Zeige nur die ersten 2 Metriken für Debugging + let otlpPreview = otlpMetrics; + if (otlpMetrics.resourceMetrics && otlpMetrics.resourceMetrics.length > 0) { + const firstResource = otlpMetrics.resourceMetrics[0]; + if (firstResource.scopeMetrics && firstResource.scopeMetrics.length > 0) { + const metrics = firstResource.scopeMetrics[0].metrics; + if (metrics && metrics.length > 2) { + otlpPreview = { + resourceMetrics: [ + { + ...firstResource, + scopeMetrics: [ + { + ...firstResource.scopeMetrics[0], + metrics: metrics.slice(0, 2) + } + ] + } + ], + totalMetrics: metrics.length + }; + } + } + } + + // logger.debug(`Transformed to OTLP (first 2 metrics) ${JSON.stringify(otlpPreview)}`); + + // Send directly without using sendData (which would transform again) + sendOtlpData('/v1/metrics', otlpMetrics, err => { if (err) { + logger.error('Error sending metrics:', err); cb(err, null); } else { - try { - // 2016-09-11 - // Older sensor versions will not repond with a JSON - // structure. Support a smooth update path. - body = JSON.parse(body); - } catch (e) { - body = []; - } - - cb(null, body); + // logger.debug('Metrics sent successfully'); + // OTLP endpoints don't return requests like the old Instana endpoint + // Always return empty array for compatibility + cb(null, []); } }); }; @@ -335,16 +370,16 @@ exports.sendSpans = function sendSpans(spans, cb) { if (err && !maxContentErrorHasBeenLogged && err instanceof PayloadTooLargeError) { logLargeSpans(spans); } else if (err) { - const spanInfo = getSpanLengthInfo(spans); + const spanInfo = spans; logger.debug(`Failed to send: ${JSON.stringify(spanInfo)}`); } else { - const spanInfo = getSpanLengthInfo(spans); + const spanInfo = spans; logger.debug(`Successfully sent: ${JSON.stringify(spanInfo)}`); } cb(err); }); - sendData(`/com.instana.plugin.nodejs/traces.${pidStore.pid}`, spans, callback, true); + sendData('/v1/traces', spans, callback, true); }; /** @@ -425,7 +460,8 @@ exports.sendTracingMetricsToAgent = function sendTracingMetricsToAgent(tracingMe cb(err); }); - sendData('/tracermetrics', tracingMetrics, callback); + // sendData('/tracermetrics', tracingMetrics, callback); + cb(); }; /** @@ -437,6 +473,11 @@ exports.sendTracingMetricsToAgent = function sendTracingMetricsToAgent(tracingMe */ function sendData(path, data, cb, ignore404 = false) { cb = util.atMostOnce(`callback for sendData: ${path}`, cb); + console.log(JSON.stringify(data)); + // Transform Instana format to OTLP format + // const otlpFormat = otlpTransformer(data); + + // console.log(JSON.stringify(otlpFormat)); const payloadAsString = JSON.stringify(data, circularReferenceRemover()); if (typeof logger.trace === 'function') { @@ -455,7 +496,7 @@ function sendData(path, data, cb, ignore404 = false) { const req = http.request( { host: agentOpts.host, - port: agentOpts.port, + port: 4318, path, method: 'POST', agent: http.agent, @@ -465,24 +506,26 @@ function sendData(path, data, cb, ignore404 = false) { } }, res => { - if (res.statusCode < 200 || res.statusCode >= 300) { - if (res.statusCode !== 404 || !ignore404) { - const statusCodeError = new Error( - `Failed to send data to agent via POST ${path}. Got status code ${res.statusCode}.` - ); - // @ts-ignore - statusCodeError.statusCode = res.statusCode; - cb(statusCodeError); - return; - } - } - res.setEncoding('utf8'); let responseBody = ''; res.on('data', chunk => { responseBody += chunk; }); res.on('end', () => { + console.log(responseBody); + + if (res.statusCode < 200 || res.statusCode >= 300) { + if (res.statusCode !== 404 || !ignore404) { + const statusCodeError = new Error( + `Failed to send data to agent via POST ${path}. Got status code ${res.statusCode}.` + ); + // @ts-ignore + statusCodeError.statusCode = res.statusCode; + cb(statusCodeError); + return; + } + } + cb(null, responseBody); }); } @@ -509,6 +552,86 @@ function sendData(path, data, cb, ignore404 = false) { req.end(); } +/** + * Sendet bereits transformierte OTLP-Daten an den Agent + * @param {string} path - API path + * @param {Object} otlpData - Already transformed OTLP data + * @param {(...args: *) => *} cb - Callback + * @param {boolean} [ignore404] + */ +function sendOtlpData(path, otlpData, cb, ignore404 = false) { + cb = util.atMostOnce(`callback for sendOtlpData: ${path}`, cb); + + const payloadAsString = JSON.stringify(otlpData, circularReferenceRemover()); + if (typeof logger.trace === 'function') { + logger.trace(`Sending OTLP data to ${path}.`); + } else { + logger.debug(`Sending OTLP data to ${path}, ${agentOpts}`); + } + + // Convert payload to a buffer to correctly identify content-length ahead of time. + const payload = Buffer.from(payloadAsString, 'utf8'); + if (payload.length > maxContentLength) { + const error = new PayloadTooLargeError(`Request payload is too large. Will not send data to agent. (POST ${path})`); + return setImmediate(cb.bind(null, error)); + } + + const req = http.request( + { + host: agentOpts.host, + port: 4318, + path, + method: 'POST', + agent: http.agent, + headers: { + 'Content-Type': 'application/json; charset=UTF-8', + 'Content-Length': payload.length + } + }, + res => { + res.setEncoding('utf8'); + let responseBody = ''; + res.on('data', chunk => { + responseBody += chunk; + }); + res.on('end', () => { + if (res.statusCode < 200 || res.statusCode >= 300) { + if (ignore404 && res.statusCode === 404) { + return cb(null, responseBody); + } + return cb( + new Error( + `Failed to send data to agent via POST ${path}. ` + + `Got status code ${res.statusCode}. Response: ${responseBody}` + ), + responseBody + ); + } + cb(null, responseBody); + }); + } + ); + + req.setTimeout(agentOpts.requestTimeout, function onTimeout() { + if (req.destroyed) { + return; + } + + req.destroy(new Error(`Sending data to agent via POST ${path}. Request timeout.`)); + }); + + req.on('error', err => { + if (req.destroyed) { + return; + } + + cb(new Error(`Send OTLP data to agent via POST ${path}. Request failed: ${err.message}`)); + }); + + req.write(payload); + req.end(); +} + exports.isConnected = function () { return isConnected; }; diff --git a/packages/core/src/tracing/backend_mappers/mapper.js b/packages/core/src/tracing/backend_mappers/mapper.js index 3e5ede3272..671c399090 100644 --- a/packages/core/src/tracing/backend_mappers/mapper.js +++ b/packages/core/src/tracing/backend_mappers/mapper.js @@ -47,6 +47,9 @@ const fieldMappings = { * @returns {import('../../core').InstanaBaseSpan} The transformed span. */ module.exports.transform = span => { + if (!span || !span.data) { + return span; + } // In most cases, `span.n` matches the key inside `span.data` (e.g., `span.n === 'redis'` → `span.data.redis`). // However, there are exceptions where `span.n` represents a higher-level concept or protocol, // while `span.data` contains one or more lower-level components. diff --git a/packages/core/src/tracing/otlpTransformer.js b/packages/core/src/tracing/otlpTransformer.js new file mode 100644 index 0000000000..b5cfdab065 --- /dev/null +++ b/packages/core/src/tracing/otlpTransformer.js @@ -0,0 +1,584 @@ +/* + * (c) Copyright IBM Corp. 2026 + */ + +'use strict'; + +const { getOtlpAttributeMappings } = require('./otlp_mapper/mapper'); + +// Cached Resource information for Metrics (when no "from" field is present) +let cachedHostId = null; +let cachedPid = null; + +/** + * Sets the Host-ID for Resource Attributes + * @param {string} hostId - Host ID + */ +function setHostId(hostId) { + cachedHostId = hostId; +} + +/** + * Sets the PID for Resource Attributes + * @param {string|number} pid - Process ID + */ +function setPid(pid) { + cachedPid = String(pid); +} + +/** + * Span kind mapping rules for special cases + * Maps specific span types to OTEL span kinds based on data context + */ +const spanKindRules = { + kafka: { + dataKey: 'kafka', + resolver: data => { + // OTEL: 4=PRODUCER, 5=CONSUMER + // Visible in UI after changing to 2 and 3 + if (data.access === 'send') return 2; + if (data.access === 'consume') return 3; + return null; // Fall back to default mapping + } + } +}; + +/** + * Converts Instana Span Kind to OTEL Span Kind + * @param {number} instanaKind - Instana span kind (1=ENTRY, 2=EXIT, 3=INTERMEDIATE) + * @param {string} spanType - Instana span type (e.g., 'node.http.server', 'kafka') + * @param {Object} data - Span data for additional context + * @returns {number} OTEL span kind + */ +function convertSpanKind(instanaKind, spanType, data) { + // OTEL: 0=UNSPECIFIED, 1=INTERNAL, 2=SERVER, 3=CLIENT, 4=PRODUCER, 5=CONSUMER + + // Check for special span kind rules + const rule = spanKindRules[spanType]; + if (rule && data && data[rule.dataKey]) { + const resolvedKind = rule.resolver(data[rule.dataKey]); + if (resolvedKind !== null) { + return resolvedKind; + } + } + + // Standard Instana kind mapping + switch (instanaKind) { + case 1: // ENTRY -> SERVER + return 2; + case 2: // EXIT -> CLIENT + return 3; + case 3: // INTERMEDIATE -> INTERNAL + return 1; + default: + return 0; // UNSPECIFIED + } +} + +/** + * Converts milliseconds to nanoseconds (as String) + * @param {number} ms - Milliseconds + * @returns {string} Nanoseconds as String + */ +function msToNano(ms) { + return String(ms * 1000000); +} + +/** + * System attribute rules for specific span types + * Automatically adds required system attributes based on span type + */ +const systemAttributeRules = { + postgres: { + dataKey: 'pg', + attributes: [{ key: 'db.system', value: 'postgresql' }] + }, + kafka: { + dataKey: 'kafka', + attributes: [{ key: 'messaging.system', value: 'kafka' }] + } +}; + +/** + * Creates OTEL Attributes from Instana Span Data using mapper schema + * @param {Object} data - Instana span data + * @param {string} spanType - Instana span type for context-specific attributes + * @returns {Array} OTEL attributes array + */ +function createAttributes(data, spanType) { + const attributes = []; + const mappings = getOtlpAttributeMappings(); + + if (!data) { + return attributes; + } + + // Add system-specific attributes based on span type + const systemRule = systemAttributeRules[spanType]; + if (systemRule && data[systemRule.dataKey]) { + systemRule.attributes.forEach(attr => { + attributes.push({ key: attr.key, value: { stringValue: attr.value } }); + }); + } + + // Process each data section (http, service, etc.) + Object.keys(data).forEach(dataKey => { + const dataSection = data[dataKey]; + const sectionMappings = mappings[dataKey]; + + if (!sectionMappings || typeof dataSection !== 'object') { + // If no mappings exist for this section, add as-is + if (dataSection !== null && dataSection !== undefined) { + const stringValue = typeof dataSection === 'object' ? JSON.stringify(dataSection) : String(dataSection); + attributes.push({ key: dataKey, value: { stringValue } }); + } + return; + } + + // Apply mappings for this section + Object.keys(dataSection).forEach(field => { + const value = dataSection[field]; + if (value === null || value === undefined) { + return; + } + + const otlpKey = sectionMappings[field] || `${dataKey}.${field}`; + + // Determine value type and format + if (otlpKey === 'http.response.status_code' && typeof value === 'number') { + attributes.push({ key: otlpKey, value: { intValue: value } }); + } else if (typeof value === 'string') { + attributes.push({ key: otlpKey, value: { stringValue: value } }); + } else if (typeof value === 'number') { + attributes.push({ key: otlpKey, value: { intValue: value } }); + } else if (typeof value === 'boolean') { + attributes.push({ key: otlpKey, value: { boolValue: value } }); + } else { + // Convert objects to JSON strings + attributes.push({ key: otlpKey, value: { stringValue: JSON.stringify(value) } }); + } + }); + }); + + return attributes; +} + +/** + * Creates Resource Attributes from Instana "from" field + * @param {Object} from - Instana from object + * @returns {Array} OTEL resource attributes + */ +function createResourceAttributes(from) { + const attributes = []; + + // Standard OTEL Resource Attributes + attributes.push({ + key: 'telemetry.sdk.language', + value: { stringValue: 'nodejs' } + }); + + attributes.push({ + key: 'telemetry.sdk.name', + value: { stringValue: '@instana/collector' } + }); + + // Service Name - use process.title or a default + const serviceName = process.env.SERVICE_NAME; + attributes.push({ + key: 'service.name', + value: { stringValue: serviceName } + }); + + // Use "from" field if present, otherwise cached values + const pid = from && from.e ? from.e : cachedPid; + const hostId = from && from.h ? from.h : cachedHostId; + + // Process PID + if (pid) { + attributes.push({ + key: 'process.pid', + value: { intValue: parseInt(pid, 10) } + }); + } + + // Host Name + if (hostId) { + attributes.push({ + key: 'host.name', + value: { stringValue: hostId } + }); + } + + return attributes; +} + +/** + * Determines the status code based on Error Count + * @param {number} errorCount - Instana error count + * @returns {Object} OTEL status object + */ +function createStatus(errorCount) { + // OTEL Status Code: 0=UNSET, 1=OK, 2=ERROR + if (errorCount > 0) { + return { code: 2 }; // ERROR + } + return { code: 1 }; // OK +} + +/** + * Span name generation rules configuration + * Each rule defines how to generate a span name for a specific span type + */ +const spanNameRules = { + 'node.http.server': { + dataKey: 'http', + template: data => { + const method = data.method || 'HTTP'; + const path = data.path_tpl || data.url || '/'; + return `${method} ${path}`; + } + }, + 'node.http.client': { + dataKey: 'http', + template: data => data.method || 'HTTP' + }, + postgres: { + dataKey: 'pg', + template: data => { + const stmt = data.stmt || ''; + const operation = stmt.split(' ')[0] || 'query'; + const db = data.db || ''; + return `pg.query:${operation} ${db}`.trim(); + } + }, + kafka: { + dataKey: 'kafka', + template: data => { + const access = data.access || 'process'; + const topic = data.service || 'unknown'; + return `${access} ${topic}`; + } + } +}; + +/** + * Generates a descriptive span name based on Instana span data + * @param {Object} instanaSpan - Instana span object + * @returns {string} Descriptive span name + */ +function generateSpanName(instanaSpan) { + const spanType = instanaSpan.n; + const data = instanaSpan.data || {}; + + // Check if we have a rule for this span type + const rule = spanNameRules[spanType]; + if (rule && data[rule.dataKey]) { + return rule.template(data[rule.dataKey]); + } + + // Default: use span type + return spanType || 'unknown'; +} + +/** + * Transforms a single Instana Span to OTEL Span + * @param {Object} instanaSpan - Instana span object + * @returns {Object} OTEL span object + */ +function transformSpan(instanaSpan) { + // Validate required fields + if (typeof instanaSpan.ts !== 'number' || typeof instanaSpan.d !== 'number') { + // Return a minimal valid span if timestamps are missing + return { + traceId: normalizeTraceId(instanaSpan.t || '0'), + spanId: instanaSpan.s || '0', + name: generateSpanName(instanaSpan), + kind: 0, + startTimeUnixNano: '0', + endTimeUnixNano: '0', + attributes: [], + status: { code: 1 } + }; + } + + const otelSpan = { + traceId: normalizeTraceId(instanaSpan.t), + spanId: instanaSpan.s, + name: generateSpanName(instanaSpan), + kind: convertSpanKind(instanaSpan.k, instanaSpan.n, instanaSpan.data), + startTimeUnixNano: msToNano(instanaSpan.ts), + endTimeUnixNano: msToNano(instanaSpan.ts + instanaSpan.d), + attributes: createAttributes(instanaSpan.data, instanaSpan.n), + status: createStatus(instanaSpan.ec || 0) + }; + + // Parent Span ID is optional + if (instanaSpan.p) { + otelSpan.parentSpanId = instanaSpan.p; + } + + return otelSpan; +} + +function normalizeTraceId(traceId) { + const normalized = String(traceId || '0'); + if (normalized.length === 32) { + return normalized; + } + if (normalized.length > 32) { + return normalized.slice(-32); + } + return normalized.padStart(32, '0'); +} + +/** + * Transforms Instana Traces to OTEL Format + * Similar to the transform pattern in mapper.js, this function processes + * Instana spans and converts them to OpenTelemetry format. + * + * @param {Array} instanaTraces - Array of Instana spans + * @returns {Object} OTEL traces object + */ +function transform(instanaTraces) { + if (!Array.isArray(instanaTraces) || instanaTraces.length === 0) { + return { + resourceSpans: [] + }; + } + + // Group Spans by Resource (from field) + const spansByResource = new Map(); + + instanaTraces.forEach(function (instanaSpan) { + // Cache PID and Host-ID from the first span for Metrics + if (instanaSpan.f) { + if (instanaSpan.f.e && !cachedPid) { + setPid(instanaSpan.f.e); + } + if (instanaSpan.f.h && !cachedHostId) { + setHostId(instanaSpan.f.h); + } + } + + const resourceKey = JSON.stringify(instanaSpan.f || {}); + + if (!spansByResource.has(resourceKey)) { + spansByResource.set(resourceKey, { + resource: instanaSpan.f, + spans: [] + }); + } + + spansByResource.get(resourceKey).spans.push(instanaSpan); + }); + + // Create OTEL ResourceSpans + const resourceSpans = Array.from(spansByResource.values()).map(function (group) { + const otelSpans = group.spans.map(transformSpan); + + console.log('-----------------', JSON.stringify(otelSpans)); + + return { + resource: { + attributes: createResourceAttributes(group.resource) + }, + scopeSpans: [ + { + scope: { + name: '@instana/collector', + version: '1.0.0' + }, + spans: otelSpans + } + ] + }; + }); + + return { + resourceSpans: resourceSpans + }; +} + +/** + * Flattens nested objects to a flat object with dot notation + * @param {Object} obj - Nested object + * @param {string} prefix - Prefix for the keys + * @returns {Object} Flat object + */ +function flattenObject(obj, prefix) { + prefix = prefix || ''; + const flattened = {}; + + for (const key in obj) { + if (!obj.hasOwnProperty(key)) { + continue; + } + + const value = obj[key]; + const newKey = prefix ? `${prefix}.${key}` : key; + + if (value === null || value === undefined) { + continue; + } + + if (typeof value === 'object' && !Array.isArray(value)) { + // Recursively flatten nested objects + const nested = flattenObject(value, newKey); + for (const nestedKey in nested) { + if (nested.hasOwnProperty(nestedKey)) { + flattened[nestedKey] = nested[nestedKey]; + } + } + } else if (typeof value === 'number' || typeof value === 'string' || typeof value === 'boolean') { + // Only take primitive values + flattened[newKey] = value; + } + } + + return flattened; +} + +/** + * Transforms Instana Metrics to OTEL Format + * @param {Array|Object} instanaMetrics - Array or object of Instana metrics + * @returns {Object} OTEL metrics object + */ +function transformMetrics(instanaMetrics) { + // If it's an object, convert the values to an array + let metricsArray = instanaMetrics; + + if (!Array.isArray(instanaMetrics)) { + if (!instanaMetrics || typeof instanaMetrics !== 'object') { + return { + resourceMetrics: [] + }; + } + + // Flatten the nested object + const flattenedMetrics = flattenObject(instanaMetrics); + + // Convert flat object to array of Metrics + metricsArray = Object.keys(flattenedMetrics).map(function (key) { + const value = flattenedMetrics[key]; + return { + name: key, + value: value, + timestamp: Date.now(), + unit: '', + from: instanaMetrics.from + }; + }); + } + + if (metricsArray.length === 0) { + return { + resourceMetrics: [] + }; + } + + // Group Metrics by Resource + const metricsByResource = new Map(); + + metricsArray.forEach(function (instanaMetric) { + const resourceKey = JSON.stringify(instanaMetric.from || {}); + + if (!metricsByResource.has(resourceKey)) { + metricsByResource.set(resourceKey, { + resource: instanaMetric.from, + metrics: [] + }); + } + + metricsByResource.get(resourceKey).metrics.push(instanaMetric); + }); + + // Create OTEL ResourceMetrics + const resourceMetrics = Array.from(metricsByResource.values()).map(function (group) { + const otelMetrics = group.metrics.map(function (metric) { + // Determine the metric type based on the value + let metricData; + if (typeof metric.value === 'number') { + metricData = { + sum: { + dataPoints: [ + { + asDouble: metric.value + } + ] + } + }; + } else if (typeof metric.value === 'string') { + // Strings as Gauge with String value (not standard OTLP, but for debugging) + metricData = { + gauge: { + dataPoints: [ + { + asDouble: 0, + attributes: [ + { + key: 'value', + value: { stringValue: metric.value } + } + ] + } + ] + } + }; + } else if (typeof metric.value === 'boolean') { + metricData = { + gauge: { + dataPoints: [ + { + asDouble: metric.value ? 1 : 0 + } + ] + } + }; + } else { + // Fallback for unknown types + metricData = { + sum: { + dataPoints: [ + { + asDouble: 0 + } + ] + } + }; + } + + return { + name: metric.name || 'unknown.metric', + ...metricData + }; + }); + + return { + resource: { + attributes: createResourceAttributes(group.resource) + }, + scopeMetrics: [ + { + scope: { + name: 'instrumentationScope', + version: '13.2' + }, + metrics: otelMetrics + } + ] + }; + }); + + return { + resourceMetrics: resourceMetrics + }; +} + +module.exports = transform; +module.exports.transform = transform; +module.exports.transformTraces = transform; +module.exports.transformMetrics = transformMetrics; +module.exports.setHostId = setHostId; +module.exports.setPid = setPid; + +// Made with Bob diff --git a/packages/core/src/tracing/otlp_mapper/mapper.js b/packages/core/src/tracing/otlp_mapper/mapper.js new file mode 100644 index 0000000000..347e4a82c8 --- /dev/null +++ b/packages/core/src/tracing/otlp_mapper/mapper.js @@ -0,0 +1,191 @@ +/* + * (c) Copyright IBM Corp. 2026 + */ + +'use strict'; + +/** + * OTLP attribute mappings for different span types. + * Maps Instana span data fields to OTLP semantic convention attributes. + * + * Based on OpenTelemetry Semantic Conventions: + * - HTTP: https://opentelemetry.io/docs/specs/semconv/http/ + * - Database: https://opentelemetry.io/docs/specs/semconv/database/ + * - Messaging: https://opentelemetry.io/docs/specs/semconv/messaging/ + * + * @type {Object>} + */ + +/** + * Common database field mappings following OTLP Database Semantic Conventions. + * These mappings apply to all database span types (pg, mysql, mongodb, redis, etc.). + */ +const databaseMappings = { + stmt: 'db.statement', + command: 'db.operation.name', + host: 'net.peer.name', + port: 'net.peer.port', + user: 'db.user', + db: 'db.name', + namespace: 'db.namespace', + collection: 'db.collection.name', + table: 'db.sql.table', + operation: 'db.operation.name', + connection: 'db.connection_string' +}; + +/** + * Database span types that should use the common database mappings. + * Add new database types here as they are instrumented. + */ +const databaseSpanTypes = [ + 'pg', + 'mysql', + 'mongodb', + 'redis', + 'mssql', + 'couchbase', + 'elasticsearch', + 'dynamodb', + 'db2', + 'memcached', + 'mongoose', + 'prisma' +]; + +/** + * Messaging span types that should use the common messaging mappings. + */ +const messagingSpanTypes = ['kafka']; + +const otlpAttributeMappings = { + // HTTP Semantic Conventions + http: { + method: 'http.request.method', + status: 'http.response.status_code', + url: 'url.full', + path: 'url.path', + host: 'server.address', + protocol: 'network.protocol.name', + params: 'url.query', + path_tpl: 'url.template', + error: 'error.type', + status_text: 'http.status_text', + route: 'http.route' + }, + + // Messaging Semantic Conventions (Kafka, etc.) + messaging: { + service: 'messaging.destination.name', + access: 'messaging.operation.type', + operation: 'messaging.operation.type' + } +}; + +/** + * Determines the appropriate mapping category for a given span data key. + * + * @param {string} key - The span data key (e.g., 'pg', 'mysql', 'http', 'kafka') + * @returns {'database' | 'messaging' | 'http' | null} The mapping category + */ +function getMappingCategory(key) { + if (key === 'http') { + return 'http'; + } + if (messagingSpanTypes.includes(key)) { + return 'messaging'; + } + if (databaseSpanTypes.includes(key)) { + return 'database'; + } + return null; +} + +/** + * Gets the appropriate mappings for a given span data key. + * + * @param {string} key - The span data key + * @returns {Object | null} The mappings object or null + */ +function getMappingsForKey(key) { + const category = getMappingCategory(key); + + if (category === 'http') { + return otlpAttributeMappings.http; + } + if (category === 'messaging') { + return otlpAttributeMappings.messaging; + } + if (category === 'database') { + return databaseMappings; + } + + return null; +} + +/** + * Transforms span data fields to OTLP attribute naming while keeping + * the mapper logic separate from the backend field mapping. + * + * @param {import('../../core').InstanaBaseSpan} span + * @returns {import('../../core').InstanaBaseSpan} The transformed span. + */ +module.exports.transform = span => { + if (!span || !span.data) { + return span; + } + + Object.keys(span.data).forEach(key => { + if (typeof span.data[key] !== 'object' || span.data[key] === null) { + return; + } + + const mappings = getMappingsForKey(key); + if (!mappings) { + return; + } + + applyMappings(span.data[key], mappings, key); + }); + + return span; +}; + +/** + * Applies OTLP field mappings to a specific data section. + * + * @param {Record} dataSection + * @param {Object} mappings + * @param {string} sectionKey + */ +function applyMappings(dataSection, mappings, sectionKey) { + Object.keys(dataSection).forEach(internalField => { + const mappedField = mappings[internalField] || `${sectionKey}.${internalField}`; + dataSection[mappedField] = dataSection[internalField]; + delete dataSection[internalField]; + }); +} + +/** + * Returns all OTLP attribute mappings including dynamic database and messaging mappings. + * + * @returns {Object>} All mappings + */ +module.exports.getOtlpAttributeMappings = function () { + /** @type {Object>} */ + const allMappings = { ...otlpAttributeMappings }; + + // Add database mappings for all database span types + databaseSpanTypes.forEach(dbType => { + allMappings[dbType] = databaseMappings; + }); + + // Add messaging mappings for all messaging span types + messagingSpanTypes.forEach(msgType => { + allMappings[msgType] = otlpAttributeMappings.messaging; + }); + + return allMappings; +}; + +// Made with Bob diff --git a/packages/core/src/tracing/spanBuffer.js b/packages/core/src/tracing/spanBuffer.js index f6bf5b19b9..1014cb445d 100644 --- a/packages/core/src/tracing/spanBuffer.js +++ b/packages/core/src/tracing/spanBuffer.js @@ -7,6 +7,7 @@ const tracingMetrics = require('./metrics'); const { transform } = require('./backend_mappers'); +const otlpTransformer = require('./otlpTransformer'); /** @type {import('../core').GenericLogger} */ let logger; @@ -456,10 +457,13 @@ function transmitSpans() { spans = []; batchingBuckets.clear(); + const processedSpans = + process.env.INSTANA_OTLP_FORMAT === 'true' ? otlpTransformer.transform(spansToSend) : spansToSend; + // We restore the content of the spans array if sending them downstream was not successful. We do not restore // batchingBuckets, though. This is deliberate. In the worst case, we might miss some batching opportunities, but // since sending spans downstream will take a few milliseconds, even that will be rare (and it is acceptable). - downstreamConnection.sendSpans(spansToSend, function sendSpans(/** @type {Error} */ error) { + downstreamConnection.sendSpans(processedSpans, function sendSpans(/** @type {Error} */ error) { if (error) { logger.warn(`Failed to transmit spans, will retry in ${transmissionDelay} ms. ${error?.message} ${error?.stack}`); spans = spans.concat(spansToSend); diff --git a/packages/core/test/tracing/otlp_mapper/mapper_test.js b/packages/core/test/tracing/otlp_mapper/mapper_test.js new file mode 100644 index 0000000000..15bff7e526 --- /dev/null +++ b/packages/core/test/tracing/otlp_mapper/mapper_test.js @@ -0,0 +1,354 @@ +/* + * (c) Copyright IBM Corp. 2026 + */ + +'use strict'; + +const expect = require('chai').expect; +const { transform, getOtlpAttributeMappings } = require('../../../src/tracing/otlp_mapper/mapper'); + +describe('tracing/otlp_mapper', () => { + let span; + + describe('Dynamic Database Mappings', () => { + it('should transform PostgreSQL span using common database mappings', () => { + span = { + n: 'pg', + data: { + pg: { + stmt: 'SELECT * FROM users', + host: 'localhost', + port: 5432, + user: 'admin', + db: 'mydb' + } + } + }; + + const result = transform(span); + expect(result.data.pg['db.statement']).to.equal('SELECT * FROM users'); + expect(result.data.pg['net.peer.name']).to.equal('localhost'); + expect(result.data.pg['net.peer.port']).to.equal(5432); + expect(result.data.pg['db.user']).to.equal('admin'); + expect(result.data.pg['db.name']).to.equal('mydb'); + expect(result.data.pg).to.not.have.property('stmt'); + expect(result.data.pg).to.not.have.property('host'); + expect(result.data.pg).to.not.have.property('port'); + expect(result.data.pg).to.not.have.property('user'); + expect(result.data.pg).to.not.have.property('db'); + }); + + it('should transform MySQL span using common database mappings', () => { + span = { + n: 'mysql', + data: { + mysql: { + stmt: 'INSERT INTO orders VALUES (1)', + host: 'db.example.com', + port: 3306, + user: 'root', + db: 'shop' + } + } + }; + + const result = transform(span); + // Note: MySQL uses same fields as PostgreSQL (stmt, host, port, user, db) + expect(result.data.mysql['db.statement']).to.equal('INSERT INTO orders VALUES (1)'); + expect(result.data.mysql['net.peer.name']).to.equal('db.example.com'); + expect(result.data.mysql['net.peer.port']).to.equal(3306); + expect(result.data.mysql['db.user']).to.equal('root'); + expect(result.data.mysql['db.name']).to.equal('shop'); + expect(result.data.mysql).to.not.have.property('stmt'); + expect(result.data.mysql).to.not.have.property('host'); + expect(result.data.mysql).to.not.have.property('port'); + expect(result.data.mysql).to.not.have.property('user'); + expect(result.data.mysql).to.not.have.property('db'); + }); + + it('should transform MongoDB span using common database mappings', () => { + span = { + n: 'mongodb', + data: { + mongodb: { + command: 'find', + namespace: 'mydb.users', + collection: 'users', + host: 'mongo.local', + port: 27017 + } + } + }; + + const result = transform(span); + expect(result.data.mongodb['db.operation.name']).to.equal('find'); + expect(result.data.mongodb['db.namespace']).to.equal('mydb.users'); + expect(result.data.mongodb['db.collection.name']).to.equal('users'); + expect(result.data.mongodb['net.peer.name']).to.equal('mongo.local'); + expect(result.data.mongodb['net.peer.port']).to.equal(27017); + expect(result.data.mongodb).to.not.have.property('command'); + expect(result.data.mongodb).to.not.have.property('namespace'); + expect(result.data.mongodb).to.not.have.property('collection'); + expect(result.data.mongodb).to.not.have.property('host'); + expect(result.data.mongodb).to.not.have.property('port'); + }); + + it('should transform Redis span using common database mappings', () => { + span = { + n: 'redis', + data: { + redis: { + command: 'GET', + connection: 'redis://localhost:6379' + } + } + }; + + const result = transform(span); + expect(result.data.redis['db.operation.name']).to.equal('GET'); + expect(result.data.redis['db.connection_string']).to.equal('redis://localhost:6379'); + expect(result.data.redis).to.not.have.property('command'); + expect(result.data.redis).to.not.have.property('connection'); + }); + + it('should handle unmapped database fields with section prefix', () => { + span = { + n: 'pg', + data: { + pg: { + stmt: 'SELECT 1', + custom_field: 'custom_value' + } + } + }; + + const result = transform(span); + expect(result.data.pg['db.statement']).to.equal('SELECT 1'); + expect(result.data.pg['pg.custom_field']).to.equal('custom_value'); + expect(result.data.pg).to.not.have.property('stmt'); + expect(result.data.pg).to.not.have.property('custom_field'); + }); + }); + + describe('HTTP Mappings', () => { + it('should transform HTTP span with specific HTTP mappings', () => { + span = { + n: 'node.http.server', + data: { + http: { + method: 'GET', + url: '/api/users', + host: 'localhost', + status: 200, + path: '/api/users', + protocol: 'HTTP/1.1' + } + } + }; + + const result = transform(span); + expect(result.data.http['http.request.method']).to.equal('GET'); + expect(result.data.http['url.full']).to.equal('/api/users'); + expect(result.data.http['server.address']).to.equal('localhost'); + expect(result.data.http['http.response.status_code']).to.equal(200); + expect(result.data.http['url.path']).to.equal('/api/users'); + expect(result.data.http['network.protocol.name']).to.equal('HTTP/1.1'); + expect(result.data.http).to.not.have.property('method'); + expect(result.data.http).to.not.have.property('url'); + expect(result.data.http).to.not.have.property('host'); + expect(result.data.http).to.not.have.property('status'); + expect(result.data.http).to.not.have.property('path'); + expect(result.data.http).to.not.have.property('protocol'); + }); + }); + + describe('Messaging Mappings', () => { + it('should transform Kafka span using messaging mappings', () => { + span = { + n: 'kafka', + data: { + kafka: { + service: 'my-topic', + access: 'produce' + } + } + }; + + const result = transform(span); + expect(result.data.kafka['messaging.destination.name']).to.equal('my-topic'); + expect(result.data.kafka['messaging.operation.type']).to.equal('produce'); + expect(result.data.kafka).to.not.have.property('service'); + expect(result.data.kafka).to.not.have.property('access'); + }); + }); + + describe('getOtlpAttributeMappings', () => { + it('should return mappings for all database types', () => { + const mappings = getOtlpAttributeMappings(); + + // Check that all database types have mappings + expect(mappings).to.have.property('pg'); + expect(mappings).to.have.property('mysql'); + expect(mappings).to.have.property('mongodb'); + expect(mappings).to.have.property('redis'); + expect(mappings).to.have.property('mssql'); + expect(mappings).to.have.property('couchbase'); + expect(mappings).to.have.property('elasticsearch'); + expect(mappings).to.have.property('dynamodb'); + expect(mappings).to.have.property('db2'); + expect(mappings).to.have.property('memcached'); + expect(mappings).to.have.property('mongoose'); + expect(mappings).to.have.property('prisma'); + + // Verify they all use the same database mappings + expect(mappings.pg).to.deep.equal(mappings.mysql); + expect(mappings.mysql).to.deep.equal(mappings.mongodb); + expect(mappings.redis).to.deep.equal(mappings.pg); + }); + + it('should return HTTP and messaging mappings', () => { + const mappings = getOtlpAttributeMappings(); + + expect(mappings).to.have.property('http'); + expect(mappings).to.have.property('kafka'); + expect(mappings.http).to.have.property('method'); + expect(mappings.kafka).to.have.property('service'); + }); + }); + + describe('HTTP OTLP Mappings (Integration with Backend Mapper)', () => { + it('should transform backend-mapped http span fields to OTLP http attributes', () => { + span = { + n: 'node.http.server', + data: { + http: { + method: 'GET', + url: '/api/users', + host: 'localhost', + status: 200 + } + } + }; + + const result = transform(span); + + // New OTel semantic conventions + expect(result.data.http['http.request.method']).to.equal('GET'); + expect(result.data.http['url.full']).to.equal('/api/users'); + expect(result.data.http['server.address']).to.equal('localhost'); + expect(result.data.http['http.response.status_code']).to.equal(200); + + expect(result.data.http).to.not.have.property('method'); + expect(result.data.http).to.not.have.property('url'); + expect(result.data.http).to.not.have.property('host'); + expect(result.data.http).to.not.have.property('status'); + }); + + it('should keep unmapped backend http fields as section-prefixed OTLP attributes', () => { + span = { + n: 'node.http.server', + data: { + http: { + method: 'POST', + url: '/orders', + host: 'service.local', + custom_header: 'x-test' + } + } + }; + + const result = transform(span); + + // New OTel semantic conventions + expect(result.data.http['http.request.method']).to.equal('POST'); + expect(result.data.http['url.full']).to.equal('/orders'); + expect(result.data.http['server.address']).to.equal('service.local'); + expect(result.data.http['http.custom_header']).to.equal('x-test'); + expect(result.data.http).to.not.have.property('method'); + expect(result.data.http).to.not.have.property('url'); + expect(result.data.http).to.not.have.property('host'); + expect(result.data.http).to.not.have.property('custom_header'); + }); + + it('should map additional HTTP fields according to OTel semantic conventions', () => { + span = { + n: 'node.http.client', + data: { + http: { + method: 'GET', + url: 'https://api.example.com/users?page=1', + path: '/users', + params: 'page=1', + protocol: 'HTTP/1.1', + path_tpl: '/users', + error: 'timeout' + } + } + }; + + const result = transform(span); + + // Verify all new mappings + expect(result.data.http['http.request.method']).to.equal('GET'); + expect(result.data.http['url.full']).to.equal('https://api.example.com/users?page=1'); + expect(result.data.http['url.path']).to.equal('/users'); + expect(result.data.http['url.query']).to.equal('page=1'); + expect(result.data.http['network.protocol.name']).to.equal('HTTP/1.1'); + expect(result.data.http['url.template']).to.equal('/users'); + expect(result.data.http['error.type']).to.equal('timeout'); + + // Verify old fields are removed + expect(result.data.http).to.not.have.property('method'); + expect(result.data.http).to.not.have.property('url'); + expect(result.data.http).to.not.have.property('path'); + expect(result.data.http).to.not.have.property('params'); + expect(result.data.http).to.not.have.property('protocol'); + expect(result.data.http).to.not.have.property('path_tpl'); + expect(result.data.http).to.not.have.property('error'); + }); + }); + + describe('Edge Cases', () => { + it('should return span unchanged if data is null', () => { + span = { n: 'test', data: null }; + const result = transform(span); + expect(result).to.equal(span); + }); + + it('should return span unchanged if span is null', () => { + const result = transform(null); + expect(result).to.equal(null); + }); + + it('should skip non-object data sections', () => { + span = { + n: 'test', + data: { + pg: { stmt: 'SELECT 1' }, + stringField: 'value', + numberField: 123 + } + }; + + const result = transform(span); + expect(result.data.pg['db.statement']).to.equal('SELECT 1'); + expect(result.data.pg).to.not.have.property('stmt'); + expect(result.data.stringField).to.equal('value'); + expect(result.data.numberField).to.equal(123); + }); + + it('should handle spans with no matching mappings', () => { + span = { + n: 'custom', + data: { + custom: { field: 'value' } + } + }; + + const result = transform(span); + expect(result.data.custom.field).to.equal('value'); + }); + }); +}); + +// Made with Bob diff --git a/packages/core/test/tracing/spanBuffer_test.js b/packages/core/test/tracing/spanBuffer_test.js index aa629b8e30..26a1611d94 100644 --- a/packages/core/test/tracing/spanBuffer_test.js +++ b/packages/core/test/tracing/spanBuffer_test.js @@ -574,9 +574,32 @@ describe('tracing/spanBuffer', () => { }); describe('when applying span transformations', () => { - beforeEach(() => spanBuffer.activate()); + before(() => { + downstreamConnectionStub = { + sendSpans: sinon.stub() + }; + + spanBuffer.init( + { + logger: testUtils.createFakeLogger(), + tracing: { + maxBufferedSpans: 1000, + forceTransmissionStartingAt: 500, + transmissionDelay: 1000, + spanBatchingEnabled: false + } + }, + downstreamConnectionStub + ); + }); + + beforeEach(() => { + spanBuffer.activate(); + downstreamConnectionStub.sendSpans.resetHistory(); + }); afterEach(() => spanBuffer.deactivate()); + const span = { t: '1234567803', s: '1234567892', @@ -605,6 +628,71 @@ describe('tracing/spanBuffer', () => { expect(spans).to.have.lengthOf(1); expect(span).to.deep.equal(span); }); + + it('should transform http spans before buffering and convert the transmitted batch to OTLP when INSTANA_OTLP_FORMAT is true', () => { + const previousValue = process.env.INSTANA_OTLP_FORMAT; + process.env.INSTANA_OTLP_FORMAT = 'true'; + downstreamConnectionStub.sendSpans.resetHistory(); + spanBuffer.setTransmitImmediate(true); + + const httpSpan = { + t: '1234567803', + s: '1234567892', + p: '1234567891', + n: 'node.http.server', + k: 1, + f: { + e: '45543', + h: 'localhost' + }, + ts: timestamp(Date.now()), + d: 25, + ec: 0, + data: { + http: { + operation: 'GET', + endpoints: '/orders', + connection: 'localhost', + status: 200 + } + } + }; + + spanBuffer.addSpan(httpSpan); + + expect(downstreamConnectionStub.sendSpans.calledOnce).to.be.true; + const sentPayload = downstreamConnectionStub.sendSpans.getCall(0).args[0]; + const sentSpan = sentPayload.resourceSpans[0].scopeSpans[0].spans[0]; + + expect(sentSpan.traceId).to.have.lengthOf(32); + expect(sentSpan.name).to.equal('GET /orders'); + expect(sentSpan.kind).to.equal(2); + expect(sentSpan.attributes).to.deep.include.members([ + { + key: 'http.request.method', + value: { stringValue: 'GET' } + }, + { + key: 'url.full', + value: { stringValue: '/orders' } + }, + { + key: 'server.address', + value: { stringValue: 'localhost' } + }, + { + key: 'http.response.status_code', + value: { intValue: 200 } + } + ]); + + spanBuffer.setTransmitImmediate(false); + if (previousValue === undefined) { + delete process.env.INSTANA_OTLP_FORMAT; + } else { + process.env.INSTANA_OTLP_FORMAT = previousValue; + } + }); }); });