Skip to content

Commit 2bfcb32

Browse files
Initial support for Micrometer Observation (#3845)
* Initial support for Micrometer Observation * Add respective Observation dependencies * Refactor an `AbstractMessageHandler` logic for potential Observation hooks * Introduce an `ObservationPropagationChannelInterceptor` to propagate an `Observation` from one thread to another through message channels * Adds an example of propagation * Fixed the user code and receiving spans * * Clean up for Tracing unit test * Make `micrometer-observation` as an `api` dep - non-optional for direct API usage Co-authored-by: Marcin Grzejszczak <[email protected]>
1 parent 78fa297 commit 2bfcb32

File tree

7 files changed

+465
-38
lines changed

7 files changed

+465
-38
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@ project('spring-integration-core') {
491491
exclude group: 'org.springframework'
492492
}
493493
api 'io.projectreactor:reactor-core'
494+
api 'io.micrometer:micrometer-observation'
494495

495496
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
496497
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
@@ -512,6 +513,8 @@ project('spring-integration-core') {
512513

513514
testImplementation "org.aspectj:aspectjweaver:$aspectjVersion"
514515
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"
516+
testImplementation 'io.micrometer:micrometer-observation-test'
517+
testImplementation 'io.micrometer:micrometer-tracing-test'
515518
}
516519

517520
dokkaHtmlPartial {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.channel.interceptor;
18+
19+
import org.springframework.aop.support.AopUtils;
20+
import org.springframework.integration.channel.DirectChannel;
21+
import org.springframework.messaging.Message;
22+
import org.springframework.messaging.MessageChannel;
23+
import org.springframework.messaging.MessageHandler;
24+
import org.springframework.util.Assert;
25+
26+
import io.micrometer.common.lang.Nullable;
27+
import io.micrometer.observation.Observation;
28+
import io.micrometer.observation.ObservationRegistry;
29+
30+
/**
31+
* The {@link org.springframework.messaging.support.ExecutorChannelInterceptor}
32+
* implementation responsible for an {@link Observation} propagation from one message
33+
* flow's thread to another through the {@link MessageChannel}s involved in the flow.
34+
* Opens a new {@link Observation.Scope} on another thread and cleans up it in the end.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 6.0
39+
*/
40+
public class ObservationPropagationChannelInterceptor extends ThreadStatePropagationChannelInterceptor<Observation> {
41+
42+
private final ThreadLocal<Observation.Scope> scopes = new ThreadLocal<>();
43+
44+
private final ObservationRegistry observationRegistry;
45+
46+
public ObservationPropagationChannelInterceptor(ObservationRegistry observationRegistry) {
47+
Assert.notNull(observationRegistry, "'observationRegistry' must noty be null");
48+
this.observationRegistry = observationRegistry;
49+
}
50+
51+
@Override
52+
@Nullable
53+
protected Observation obtainPropagatingContext(Message<?> message, MessageChannel channel) {
54+
if (!DirectChannel.class.isAssignableFrom(AopUtils.getTargetClass(channel))) {
55+
return this.observationRegistry.getCurrentObservation();
56+
}
57+
return null;
58+
}
59+
60+
@Override
61+
protected void populatePropagatedContext(@Nullable Observation state, Message<?> message, MessageChannel channel) {
62+
if (state != null) {
63+
Observation.Scope scope = state.openScope();
64+
this.scopes.set(scope);
65+
}
66+
}
67+
68+
@Override
69+
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
70+
Observation.Scope scope = this.scopes.get();
71+
if (scope != null && scope == this.observationRegistry.getCurrentObservationScope()) {
72+
scope.close();
73+
this.scopes.remove();
74+
}
75+
}
76+
77+
}

spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/ThreadStatePropagationChannelInterceptor.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.springframework.messaging.MessageHeaders;
2424
import org.springframework.messaging.support.ExecutorChannelInterceptor;
2525

26+
import io.micrometer.common.lang.Nullable;
27+
2628
/**
2729
* The {@link ExecutorChannelInterceptor} implementation responsible for
2830
* the {@link Thread} (any?) state propagation from one message flow's thread to another
@@ -47,16 +49,16 @@
4749
*
4850
* @author Artem Bilan
4951
* @author Gary Russell
52+
*
5053
* @since 4.2
5154
*/
52-
public abstract class ThreadStatePropagationChannelInterceptor<S>
53-
implements ExecutorChannelInterceptor {
55+
public abstract class ThreadStatePropagationChannelInterceptor<S> implements ExecutorChannelInterceptor {
5456

5557
@Override
5658
public final Message<?> preSend(Message<?> message, MessageChannel channel) {
5759
S threadContext = obtainPropagatingContext(message, channel);
5860
if (threadContext != null) {
59-
return new MessageWithThreadState<S>(message, threadContext);
61+
return new MessageWithThreadState<>(message, threadContext);
6062
}
6163
else {
6264
return message;
@@ -80,15 +82,10 @@ public final Message<?> beforeHandle(Message<?> message, MessageChannel channel,
8082
return postReceive(message, channel);
8183
}
8284

83-
@Override
84-
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler,
85-
Exception ex) {
86-
// No-op
87-
}
88-
85+
@Nullable
8986
protected abstract S obtainPropagatingContext(Message<?> message, MessageChannel channel);
9087

91-
protected abstract void populatePropagatedContext(S state, Message<?> message, MessageChannel channel);
88+
protected abstract void populatePropagatedContext(@Nullable S state, Message<?> message, MessageChannel channel);
9289

9390

9491
private static final class MessageWithThreadState<S> implements Message<Object>, MessageDecorator {
@@ -129,4 +126,3 @@ public String toString() {
129126
}
130127

131128
}
132-

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,31 +39,42 @@ public abstract class AbstractMessageHandler extends MessageHandlerSupport
3939

4040
@Override // NOSONAR
4141
public void handleMessage(Message<?> message) {
42-
Message<?> messageToUse = message;
43-
Assert.notNull(messageToUse, "Message must not be null");
42+
Assert.notNull(message, "Message must not be null");
4443
if (isLoggingEnabled() && this.logger.isDebugEnabled()) {
45-
this.logger.debug(this + " received message: " + messageToUse);
44+
this.logger.debug(this + " received message: " + message);
4645
}
47-
SampleFacade sample = null;
4846
MetricsCaptor metricsCaptor = getMetricsCaptor();
4947
if (metricsCaptor != null) {
50-
sample = metricsCaptor.start();
48+
handleWithMetrics(message, metricsCaptor);
49+
}
50+
else {
51+
doHandleMessage(message);
52+
}
53+
}
54+
55+
private void handleWithMetrics(Message<?> message, MetricsCaptor metricsCaptor) {
56+
SampleFacade sample = metricsCaptor.start();
57+
try {
58+
doHandleMessage(message);
59+
sample.stop(sendTimer());
5160
}
61+
catch (Exception ex) {
62+
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
63+
throw ex;
64+
}
65+
}
66+
67+
private void doHandleMessage(Message<?> message) {
68+
Message<?> messageToUse = message;
5269
try {
5370
if (shouldTrack()) {
5471
messageToUse = MessageHistory.write(messageToUse, this, getMessageBuilderFactory());
5572
}
5673
handleMessageInternal(messageToUse);
57-
if (sample != null) {
58-
sample.stop(sendTimer());
59-
}
6074
}
61-
catch (Exception e) {
62-
if (sample != null) {
63-
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
64-
}
75+
catch (Exception ex) {
6576
throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(messageToUse,
66-
() -> "error occurred in message handler [" + this + "]", e);
77+
() -> "error occurred in message handler [" + this + "]", ex);
6778
}
6879
}
6980

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
/**
2+
* Provides classes to support of Micrometer Observation API.
3+
*/
4+
5+
@org.springframework.lang.NonNullApi
6+
@org.springframework.lang.NonNullFields
7+
package org.springframework.integration.support.management.observation;

0 commit comments

Comments
 (0)