Skip to content

Commit 4d1e290

Browse files
committed
Fix HttpClient5 plugin injecting headers into ClickHouse HTTP requests
ClickHouse HTTP interface (port 8123) rejects unknown headers, causing HTTP 400 errors when SkyWalking Java agent injects sw8 propagation headers. Introduce PROPAGATION_EXCLUDE_PORTS config (default "8123") and update HttpClientDoExecuteInterceptor to skip tracing for excluded ports. Requests to other ports continue to be traced normally. Verified with ClickHouse JDBC queries.
1 parent f750006 commit 4d1e290

4 files changed

Lines changed: 343 additions & 1 deletion

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Release Notes.
77

88
* Added support for Lettuce reactive Redis commands.
99
* Add Spring AI 1.x plugin and GenAI layer.
10+
* Fix httpclient-5.x plugin injecting sw8 propagation headers into ClickHouse HTTP requests (port 8123), causing HTTP 400. Add `PROPAGATION_EXCLUDE_PORTS` config to skip header injection for specified ports.
1011

1112
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)
1213

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.httpclient.v5;
20+
21+
import org.apache.skywalking.apm.agent.core.boot.PluginConfig;
22+
23+
public class HttpClient5PluginConfig {
24+
public static class Plugin {
25+
@PluginConfig(root = HttpClient5PluginConfig.class)
26+
public static class HttpClient5 {
27+
/**
28+
* Comma-separated list of destination ports whose HTTP requests
29+
* should NOT have SkyWalking propagation headers injected.
30+
*
31+
* <p>Some HTTP-based database protocols (e.g. ClickHouse on port 8123)
32+
* reject requests that contain unknown HTTP headers, returning HTTP 400.
33+
* Adding such ports here prevents the agent from injecting the {@code sw8}
34+
* tracing headers into those outbound requests while leaving all other
35+
* HTTP calls fully traced.
36+
*
37+
* <p>Default: {@code "8123"} (ClickHouse HTTP interface).
38+
*
39+
* <p>Example – also exclude port 9200 (Elasticsearch):
40+
* {@code plugin.httpclient5.PROPAGATION_EXCLUDE_PORTS=8123,9200}
41+
*/
42+
public static String PROPAGATION_EXCLUDE_PORTS = "8123";
43+
}
44+
}
45+
}

apm-sniffer/apm-sdk-plugin/httpclient-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpclient/v5/HttpClientDoExecuteInterceptor.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,23 @@
3737
import java.lang.reflect.Method;
3838
import java.net.MalformedURLException;
3939
import java.net.URL;
40+
import java.util.Arrays;
41+
import java.util.Collections;
42+
import java.util.Set;
43+
import java.util.stream.Collectors;
4044

4145
public abstract class HttpClientDoExecuteInterceptor implements InstanceMethodsAroundInterceptor {
4246
private static final String ERROR_URI = "/_blank";
4347

4448
private static final ILog LOGGER = LogManager.getLogger(HttpClientDoExecuteInterceptor.class);
4549

50+
/**
51+
* Lazily-resolved, immutable set of ports that must not receive SkyWalking
52+
* propagation headers. Built once from
53+
* {@link HttpClient5PluginConfig.Plugin.HttpClient5#PROPAGATION_EXCLUDE_PORTS}.
54+
*/
55+
private volatile Set<Integer> excludePortsCache;
56+
4657
@Override
4758
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
4859
MethodInterceptResult result) throws Throwable {
@@ -77,7 +88,55 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
7788

7889
protected boolean skipIntercept(EnhancedInstance objInst, Method method, Object[] allArguments,
7990
Class<?>[] argumentsTypes) {
80-
return allArguments[1] == null || getHttpHost(objInst, method, allArguments, argumentsTypes) == null;
91+
if (allArguments[1] == null) {
92+
return true;
93+
}
94+
HttpHost host = getHttpHost(objInst, method, allArguments, argumentsTypes);
95+
if (host == null) {
96+
return true;
97+
}
98+
return isExcludedPort(host.getPort());
99+
}
100+
101+
/**
102+
* Returns {@code true} when {@code port} is listed in
103+
* {@link HttpClient5PluginConfig.Plugin.HttpClient5#PROPAGATION_EXCLUDE_PORTS}.
104+
*
105+
* <p>The config value is parsed lazily and cached so that it is read after
106+
* the agent has fully initialised its configuration subsystem.
107+
*/
108+
private boolean isExcludedPort(int port) {
109+
if (port <= 0) {
110+
return false;
111+
}
112+
if (excludePortsCache == null) {
113+
synchronized (this) {
114+
if (excludePortsCache == null) {
115+
excludePortsCache = parseExcludePorts(
116+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS);
117+
}
118+
}
119+
}
120+
return excludePortsCache.contains(port);
121+
}
122+
123+
private static Set<Integer> parseExcludePorts(String raw) {
124+
if (raw == null || raw.trim().isEmpty()) {
125+
return Collections.emptySet();
126+
}
127+
return Arrays.stream(raw.split(","))
128+
.map(String::trim)
129+
.filter(s -> !s.isEmpty())
130+
.map(s -> {
131+
try {
132+
return Integer.parseInt(s);
133+
} catch (NumberFormatException e) {
134+
LOGGER.warn("Ignoring invalid port in PROPAGATION_EXCLUDE_PORTS: {}", s);
135+
return -1;
136+
}
137+
})
138+
.filter(p -> p > 0)
139+
.collect(Collectors.toSet());
81140
}
82141

83142
protected abstract HttpHost getHttpHost(EnhancedInstance objInst, Method method, Object[] allArguments,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.httpclient.v5;
20+
21+
import org.apache.hc.core5.http.ClassicHttpRequest;
22+
import org.apache.hc.core5.http.ClassicHttpResponse;
23+
import org.apache.hc.core5.http.HttpHost;
24+
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
25+
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
27+
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
28+
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
29+
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
30+
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
31+
import org.junit.Before;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.mockito.Mock;
36+
import org.mockito.junit.MockitoJUnit;
37+
import org.mockito.junit.MockitoRule;
38+
39+
import java.net.URI;
40+
import java.util.List;
41+
42+
import static org.hamcrest.CoreMatchers.is;
43+
import static org.hamcrest.MatcherAssert.assertThat;
44+
import static org.mockito.ArgumentMatchers.anyString;
45+
import static org.mockito.Mockito.never;
46+
import static org.mockito.Mockito.verify;
47+
import static org.mockito.Mockito.when;
48+
49+
/**
50+
* Verifies that requests to ports listed in
51+
* {@link HttpClient5PluginConfig.Plugin.HttpClient5#PROPAGATION_EXCLUDE_PORTS}
52+
* are silently skipped (no span created, no {@code sw8} header injected).
53+
*
54+
* <p>This regression-covers the ClickHouse HTTP-interface issue: ClickHouse
55+
* listens on port 8123 and rejects HTTP requests that carry unknown headers
56+
* (such as the SkyWalking {@code sw8} propagation header), responding with
57+
* HTTP 400 Bad Request. By excluding port 8123 the agent leaves those
58+
* requests untouched while continuing to trace all other HTTP calls.
59+
*/
60+
@RunWith(TracingSegmentRunner.class)
61+
public class HttpClientPropagationExcludePortTest {
62+
63+
@SegmentStoragePoint
64+
private SegmentStorage segmentStorage;
65+
66+
@Rule
67+
public AgentServiceRule agentServiceRule = new AgentServiceRule();
68+
@Rule
69+
public MockitoRule rule = MockitoJUnit.rule();
70+
71+
@Mock
72+
private HttpHost clickHouseHost; // port 8123 – should be excluded
73+
@Mock
74+
private HttpHost regularHost; // port 8080 – should be traced
75+
@Mock
76+
private ClassicHttpRequest request;
77+
@Mock
78+
private ClassicHttpResponse httpResponse;
79+
@Mock
80+
private EnhancedInstance enhancedInstance;
81+
82+
private HttpClientDoExecuteInterceptor internalInterceptor;
83+
private HttpClientDoExecuteInterceptor minimalInterceptor;
84+
85+
private Object[] clickHouseArgs;
86+
private Object[] regularArgs;
87+
private Class<?>[] argumentsType;
88+
89+
@Before
90+
public void setUp() throws Exception {
91+
ServiceManager.INSTANCE.boot();
92+
93+
// Set the exclusion list to the default (includes 8123)
94+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS = "8123";
95+
96+
internalInterceptor = new InternalClientDoExecuteInterceptor();
97+
minimalInterceptor = new MinimalClientDoExecuteInterceptor();
98+
99+
when(httpResponse.getCode()).thenReturn(200);
100+
101+
// ClickHouse-like host on port 8123
102+
when(clickHouseHost.getHostName()).thenReturn("clickhouse-server");
103+
when(clickHouseHost.getSchemeName()).thenReturn("http");
104+
when(clickHouseHost.getPort()).thenReturn(8123);
105+
106+
// Regular application host on port 8080
107+
when(regularHost.getHostName()).thenReturn("my-service");
108+
when(regularHost.getSchemeName()).thenReturn("http");
109+
when(regularHost.getPort()).thenReturn(8080);
110+
111+
when(request.getUri()).thenReturn(new URI("http://my-service:8080/api/ping"));
112+
when(request.getMethod()).thenReturn("GET");
113+
114+
clickHouseArgs = new Object[]{clickHouseHost, request};
115+
regularArgs = new Object[]{regularHost, request};
116+
argumentsType = new Class[]{HttpHost.class, ClassicHttpRequest.class};
117+
}
118+
119+
// -----------------------------------------------------------------------
120+
// InternalHttpClient path
121+
// -----------------------------------------------------------------------
122+
123+
/**
124+
* Requests to port 8123 via {@code InternalHttpClient} must not produce a
125+
* trace segment and must NOT set any propagation header on the request.
126+
*
127+
* <p>Before this fix the agent injected {@code sw8} (and two companion
128+
* headers) into every outbound request regardless of the destination port.
129+
* ClickHouse interprets unknown headers as malformed requests and returns
130+
* HTTP 400, making all JDBC queries fail while the SkyWalking agent is
131+
* attached.
132+
*/
133+
@Test
134+
public void internalClient_requestToExcludedPort_noSpanAndNoHeaderInjected() throws Throwable {
135+
internalInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
136+
internalInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
137+
138+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
139+
assertThat("No trace segment should be created for excluded port", segments.size(), is(0));
140+
verify(request, never()).setHeader(anyString(), anyString());
141+
}
142+
143+
/**
144+
* Requests to a non-excluded port via {@code InternalHttpClient} must still
145+
* be traced and have propagation headers injected.
146+
*/
147+
@Test
148+
public void internalClient_requestToRegularPort_spanCreatedAndHeadersInjected() throws Throwable {
149+
internalInterceptor.beforeMethod(enhancedInstance, null, regularArgs, argumentsType, null);
150+
internalInterceptor.afterMethod(enhancedInstance, null, regularArgs, argumentsType, httpResponse);
151+
152+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
153+
assertThat("A trace segment must be created for a non-excluded port", segments.size(), is(1));
154+
// sw8, sw8-correlation, sw8-x are the 3 propagation headers
155+
verify(request, org.mockito.Mockito.atLeastOnce()).setHeader(anyString(), anyString());
156+
}
157+
158+
// -----------------------------------------------------------------------
159+
// MinimalHttpClient path
160+
// -----------------------------------------------------------------------
161+
162+
/**
163+
* Same assertion for the {@code MinimalHttpClient} code path.
164+
*/
165+
@Test
166+
public void minimalClient_requestToExcludedPort_noSpanAndNoHeaderInjected() throws Throwable {
167+
minimalInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
168+
minimalInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
169+
170+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
171+
assertThat("No trace segment should be created for excluded port", segments.size(), is(0));
172+
verify(request, never()).setHeader(anyString(), anyString());
173+
}
174+
175+
/**
176+
* Normal (non-excluded) port via {@code MinimalHttpClient} must still be
177+
* traced.
178+
*/
179+
@Test
180+
public void minimalClient_requestToRegularPort_spanCreatedAndHeadersInjected() throws Throwable {
181+
minimalInterceptor.beforeMethod(enhancedInstance, null, regularArgs, argumentsType, null);
182+
minimalInterceptor.afterMethod(enhancedInstance, null, regularArgs, argumentsType, httpResponse);
183+
184+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
185+
assertThat("A trace segment must be created for a non-excluded port", segments.size(), is(1));
186+
verify(request, org.mockito.Mockito.atLeastOnce()).setHeader(anyString(), anyString());
187+
}
188+
189+
// -----------------------------------------------------------------------
190+
// Configuration edge cases
191+
// -----------------------------------------------------------------------
192+
193+
/**
194+
* When {@code PROPAGATION_EXCLUDE_PORTS} is cleared (empty string), every
195+
* port – including 8123 – must be traced normally.
196+
*/
197+
@Test
198+
public void whenExcludePortsEmpty_allPortsAreTraced() throws Throwable {
199+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS = "";
200+
201+
// Use a fresh interceptor so the cache is not pre-populated
202+
HttpClientDoExecuteInterceptor freshInterceptor = new MinimalClientDoExecuteInterceptor();
203+
freshInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
204+
freshInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
205+
206+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
207+
assertThat("Port 8123 should be traced when exclusion list is empty", segments.size(), is(1));
208+
}
209+
210+
/**
211+
* Multiple ports can be listed: verify that both excluded ports are silently
212+
* skipped while a third, non-excluded port is still traced.
213+
*/
214+
@Test
215+
public void multipleExcludedPorts_allSkipped() throws Throwable {
216+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS = "8123,9200";
217+
218+
HttpClientDoExecuteInterceptor freshInterceptor = new MinimalClientDoExecuteInterceptor();
219+
220+
// 8123 – must be excluded
221+
freshInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
222+
freshInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
223+
assertThat(segmentStorage.getTraceSegments().size(), is(0));
224+
225+
// Set up a mock host on port 9200 (Elasticsearch)
226+
HttpHost esHost = org.mockito.Mockito.mock(HttpHost.class);
227+
when(esHost.getHostName()).thenReturn("es-server");
228+
when(esHost.getSchemeName()).thenReturn("http");
229+
when(esHost.getPort()).thenReturn(9200);
230+
Object[] esArgs = new Object[]{esHost, request};
231+
232+
freshInterceptor = new MinimalClientDoExecuteInterceptor();
233+
freshInterceptor.beforeMethod(enhancedInstance, null, esArgs, argumentsType, null);
234+
freshInterceptor.afterMethod(enhancedInstance, null, esArgs, argumentsType, httpResponse);
235+
assertThat("Port 9200 should also be excluded", segmentStorage.getTraceSegments().size(), is(0));
236+
}
237+
}

0 commit comments

Comments
 (0)