|
20 | 20 |
|
21 | 21 | import org.apache.skywalking.apm.agent.core.context.ContextManager; |
22 | 22 | import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; |
| 23 | +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; |
| 24 | +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; |
23 | 25 | import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; |
24 | 26 | import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2; |
25 | 27 | import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext; |
| 28 | +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; |
26 | 29 | import reactor.core.publisher.Flux; |
27 | 30 | import reactor.core.publisher.Mono; |
28 | 31 | import reactor.util.context.Context; |
@@ -53,31 +56,34 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA |
53 | 56 | if (!(ret instanceof Mono) && !(ret instanceof Flux)) { |
54 | 57 | return ret; |
55 | 58 | } |
| 59 | + final AbstractSpan localSpan = ContextManager.createLocalSpan("Lettuce/Reactive/" + method.getName()); |
| 60 | + localSpan.setComponent(ComponentsDefine.LETTUCE); |
| 61 | + SpanLayer.asCache(localSpan); |
56 | 62 |
|
57 | | - final ContextSnapshot snapshot; |
58 | | - if (ContextManager.isActive()) { |
59 | | - snapshot = ContextManager.capture(); |
60 | | - } else { |
61 | | - return ret; |
62 | | - } |
| 63 | + try { |
| 64 | + final ContextSnapshot snapshot = ContextManager.capture(); |
63 | 65 |
|
64 | | - Function<Context, Context> contextFunction = ctx -> { |
65 | | - if (ctx.hasKey(SNAPSHOT_KEY)) { |
66 | | - return ctx; |
67 | | - } |
68 | | - return ctx.put(SNAPSHOT_KEY, snapshot); |
69 | | - }; |
| 66 | + Function<Context, Context> contextFunction = ctx -> { |
| 67 | + if (ctx.hasKey(SNAPSHOT_KEY)) { |
| 68 | + return ctx; |
| 69 | + } |
| 70 | + return ctx.put(SNAPSHOT_KEY, snapshot); |
| 71 | + }; |
70 | 72 |
|
71 | | - if (ret instanceof Mono) { |
72 | | - Mono<?> original = (Mono<?>) ret; |
73 | | - return original.subscriberContext(contextFunction); |
74 | | - } else { |
75 | | - Flux<?> original = (Flux<?>) ret; |
76 | | - return original.subscriberContext(contextFunction); |
| 73 | + if (ret instanceof Mono) { |
| 74 | + Mono<?> original = (Mono<?>) ret; |
| 75 | + return original.subscriberContext(contextFunction); |
| 76 | + } else { |
| 77 | + Flux<?> original = (Flux<?>) ret; |
| 78 | + return original.subscriberContext(contextFunction); |
| 79 | + } |
| 80 | + } finally { |
| 81 | + ContextManager.stopSpan(); |
77 | 82 | } |
78 | 83 | } |
79 | 84 |
|
80 | 85 | @Override |
81 | | - public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) { |
| 86 | + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] |
| 87 | + argumentsTypes, Throwable t, MethodInvocationContext context) { |
82 | 88 | } |
83 | 89 | } |
0 commit comments