Skip to content

Commit f991f40

Browse files
committed
Initial commit
0 parents  commit f991f40

7 files changed

Lines changed: 245 additions & 0 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/target
2+
/Cargo.lock
3+
.idea/

Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "ddtrace"
3+
version = "0.0.1"
4+
edition = "2021"
5+
6+
[features]
7+
axum = ["dep:axum", "dep:axum-tracing-opentelemetry"]
8+
9+
[dependencies]
10+
axum = { version = "^0.6.10", optional = true }
11+
axum-tracing-opentelemetry = { version = "^0.10.0", optional = true }
12+
chrono = "^0.4.24"
13+
opentelemetry = { version = "^0.18.0", features = ["rt-tokio"] }
14+
opentelemetry-datadog = "^0.6.0"
15+
opentelemetry-otlp = { version = "^0.11.0" }
16+
serde = { version = "^1.0.156", features = ["derive"] }
17+
serde_json = "^1.0.95"
18+
tracing = "^0.1.37"
19+
tracing-opentelemetry = "^0.18.0"
20+
tracing-serde = "^0.1.3"
21+
tracing-subscriber = { version = "^0.3.16", features = ["env-filter", "json"] }

README.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
This repository is home to a Rust crate `ddtrace` with various Datadog
2+
utilities for tracing and logging in Rust.
3+
4+
# Background
5+
6+
Datadog has official support for Python, which includes various SDKs and
7+
other utilities (such as the Python `ddtrace` library)
8+
for tracing and logging in Python applications.
9+
10+
They don't have similar support for Rust. However, they do support the
11+
[OpenTelemetry](https://opentelemetry.io/) format for both logs and traces.
12+
This crate contains the necessary glue to bridge the gap between OpenTelemetry
13+
and Datadog.
14+
15+
# Features
16+
## Tracing
17+
For traces, the official Datadog agent
18+
[can ingest OTel trace data](https://docs.datadoghq.com/opentelemetry/)
19+
with the correct environment variable settings. The traces can be sent
20+
via either HTTP or gRPC. More information on this can be found here:
21+
https://docs.datadoghq.com/opentelemetry/otlp_ingest_in_the_agent/?tab=docker
22+
23+
OpenTelemetry has an official Rust crate with extensions for major
24+
formats/providers. This includes a datadog exporter. We have found
25+
this exporter to be less reliable than the standard OTel exporter
26+
sending data to the OTel endpoint of the Datadog agent, though.
27+
28+
This library provides utilities to set up the Rust `tracing` crate
29+
for sending data to the agent in the correct way.
30+
31+
## Logging
32+
Datadog can ingest OpenTelemetry logs with two caveats -
33+
it expects the `dd.trace_id` and `dd.span_id` attributes
34+
to be set, and it expects a slightly different format for
35+
trace ID.
36+
37+
This crate contains a JSON formatter layer that also correctly
38+
transform the trace ID to the Datadog native format.
39+
40+
## Propagation
41+
The Python library takes care of propagation of the trace context automatically.
42+
Unfortunately, we need to do this manually in Rust. There are many protocols and
43+
corresponding libraries we could support, but the main one is HTTP requests.
44+
45+
In Rust, `reqwest` is the most commonly used HTTP client crate. We provide a
46+
`reqwest` middleware that injects the necessary headers using the Datadog native
47+
propagation standard (common alternatives would be Jaeger and B3, more on this:
48+
https://opentelemetry.io/docs/reference/specification/context/api-propagators/#propagators-distribution).
49+
50+
## Axum Support
51+
The trace context propagated from other services needs to be extracted and injected
52+
into the propagator. For `axum`, our choice of HTTP API framework, a third-party crate
53+
exists that supports this: https://github.com/davidB/axum-tracing-opentelemetry.
54+
55+
We re-expose this library for convenience.
56+
57+
# Full Example
58+
59+
TODO

src/formatter.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use std::io;
2+
3+
use chrono::Utc;
4+
use opentelemetry::trace::{SpanId, TraceContextExt, TraceId};
5+
use serde::ser::{SerializeMap, Serializer as _};
6+
use tracing::{Event, Subscriber};
7+
use tracing_opentelemetry::OtelData;
8+
use tracing_serde::fields::AsMap;
9+
use tracing_serde::AsSerde;
10+
use tracing_subscriber::fmt::format::Writer;
11+
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
12+
use tracing_subscriber::registry::{LookupSpan, SpanRef};
13+
14+
pub struct TraceInfo {
15+
pub trace_id: String,
16+
pub span_id: String,
17+
}
18+
19+
fn convert_trace_id(id: TraceId) -> String {
20+
let bytes = &id.to_bytes()[std::mem::size_of::<u64>()..std::mem::size_of::<u128>()];
21+
let dd_id = u64::from_be_bytes(bytes.try_into().unwrap());
22+
23+
dd_id.to_string()
24+
}
25+
26+
fn convert_span_id(id: SpanId) -> String {
27+
let dd_id = u64::from_be_bytes(id.to_bytes().try_into().unwrap());
28+
29+
dd_id.to_string()
30+
}
31+
32+
fn lookup_trace_info<S>(span_ref: &SpanRef<S>) -> Option<TraceInfo>
33+
where
34+
S: Subscriber + for<'a> LookupSpan<'a>,
35+
{
36+
span_ref.extensions().get::<OtelData>().map(|o| TraceInfo {
37+
trace_id: convert_trace_id(o.parent_cx.span().span_context().trace_id()),
38+
span_id: convert_span_id(o.builder.span_id.unwrap_or(SpanId::INVALID)),
39+
})
40+
}
41+
42+
// mostly stolen from here: https://github.com/tokio-rs/tracing/issues/1531
43+
pub struct TraceIdFormat;
44+
45+
impl<S, N> FormatEvent<S, N> for TraceIdFormat
46+
where
47+
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
48+
N: for<'writer> FormatFields<'writer> + 'static,
49+
{
50+
fn format_event(
51+
&self,
52+
ctx: &FmtContext<'_, S, N>,
53+
mut writer: Writer<'_>,
54+
event: &Event<'_>,
55+
) -> std::fmt::Result
56+
where
57+
S: Subscriber + for<'a> LookupSpan<'a>,
58+
{
59+
let meta = event.metadata();
60+
61+
let mut visit = || {
62+
let mut serializer = serde_json::Serializer::new(WriteAdaptor::new(&mut writer));
63+
let mut serializer = serializer.serialize_map(None)?;
64+
serializer.serialize_entry("timestamp", &Utc::now().to_rfc3339())?;
65+
serializer.serialize_entry("level", &meta.level().as_serde())?;
66+
serializer.serialize_entry("fields", &event.field_map())?;
67+
serializer.serialize_entry("target", meta.target())?;
68+
69+
if let Some(ref span_ref) = ctx.lookup_current() {
70+
if let Some(trace_info) = lookup_trace_info(span_ref) {
71+
serializer.serialize_entry("dd.span_id", &trace_info.span_id)?;
72+
serializer.serialize_entry("dd.trace_id", &trace_info.trace_id)?;
73+
}
74+
}
75+
76+
serializer.end()
77+
};
78+
79+
visit().map_err(|_| std::fmt::Error)?;
80+
writeln!(writer)
81+
}
82+
}
83+
84+
pub struct WriteAdaptor<'a> {
85+
fmt_write: &'a mut dyn std::fmt::Write,
86+
}
87+
88+
impl<'a> WriteAdaptor<'a> {
89+
pub fn new(fmt_write: &'a mut dyn std::fmt::Write) -> Self {
90+
Self { fmt_write }
91+
}
92+
}
93+
94+
impl<'a> io::Write for WriteAdaptor<'a> {
95+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
96+
let s =
97+
std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
98+
99+
self.fmt_write
100+
.write_str(s)
101+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
102+
103+
Ok(s.as_bytes().len())
104+
}
105+
106+
fn flush(&mut self) -> io::Result<()> {
107+
Ok(())
108+
}
109+
}

src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
mod formatter;
2+
mod propagator;
3+
mod tracer;
4+
5+
pub use formatter::TraceIdFormat;
6+
pub use propagator::set_global_propagator;
7+
pub use tracer::{build_tracer, build_layer};
8+
pub use opentelemetry::trace::{TraceError, TraceResult};
9+
10+
#[cfg(feature = "axum")]
11+
pub use axum_tracing_opentelemetry::opentelemetry_tracing_layer;

src/propagator.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pub fn set_global_propagator() {
2+
let propagator = opentelemetry_datadog::DatadogPropagator::new();
3+
opentelemetry::global::set_text_map_propagator(propagator);
4+
}

src/tracer.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use std::time::Duration;
2+
use opentelemetry::KeyValue;
3+
use opentelemetry::trace::{TraceResult};
4+
use opentelemetry::sdk::{Resource, trace};
5+
use opentelemetry::sdk::trace::{RandomIdGenerator, Sampler, Tracer};
6+
use opentelemetry_otlp::WithExportConfig;
7+
use tracing::Subscriber;
8+
use tracing_opentelemetry::{OpenTelemetryLayer, PreSampledTracer};
9+
use tracing_subscriber::registry::LookupSpan;
10+
11+
pub fn build_tracer(service_name: &str) -> TraceResult<Tracer> {
12+
let exporter = opentelemetry_otlp::new_exporter()
13+
.tonic()
14+
.with_timeout(Duration::from_secs(3));
15+
16+
opentelemetry_otlp::new_pipeline()
17+
.tracing()
18+
.with_trace_config(
19+
trace::config()
20+
.with_sampler(Sampler::AlwaysOn)
21+
.with_resource(Resource::new(vec![KeyValue::new(
22+
"service.name",
23+
service_name.to_string(),
24+
)]))
25+
.with_id_generator(RandomIdGenerator::default()),
26+
)
27+
.with_exporter(exporter)
28+
.install_batch(opentelemetry::runtime::Tokio)
29+
}
30+
31+
pub fn build_layer<S>(service_name: &str) -> TraceResult<OpenTelemetryLayer<S, Tracer>>
32+
where
33+
Tracer: opentelemetry::trace::Tracer + PreSampledTracer + 'static,
34+
S: Subscriber + for<'span> LookupSpan<'span>,
35+
{
36+
let tracer = build_tracer(service_name)?;
37+
Ok(tracing_opentelemetry::layer().with_tracer(tracer))
38+
}

0 commit comments

Comments
 (0)