Skip to content

Commit 4456caf

Browse files
Introduce high-level API for flows composition (#3624)
* Introduce high-level API for flows composition For better end-user experience and more smooth integration logic decomposition and distribution introduce an `IntegrationFlows.from(IntegrationFlow)` to let to start the current flow from existing one. On the other hand introduce an `BaseIntegrationFlowDefinition.to(IntegrationFlow)` to let to continue the flow logic in the other existing one. This way we can extract some templating logic into separate `IntegrationFlow` definitions allowing at the same time to decompose a complex flow definition into logical reusable parts * * Add more tests * * Fix Checkstyle violation * Add `@SuppressWarnings("overloads")` to new `from(IntegrationFlow)` and existing `from(Publisher)`. Technically it does not make sense since `PublisherIntegrationFlow` is not a `public` class * * Add docs * Fix language in JavaDocs according review * Fix language in the docs after review Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent b8785e5 commit 4456caf

File tree

10 files changed

+379
-4
lines changed

10 files changed

+379
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ public void setHandler(Object handler) {
137137
}
138138
}
139139

140+
public MessageHandler getHandler() {
141+
return this.handler;
142+
}
143+
140144
public void setInputChannel(MessageChannel inputChannel) {
141145
this.inputChannel = inputChannel;
142146
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2917,6 +2917,18 @@ public IntegrationFlow nullChannel() {
29172917
.get();
29182918
}
29192919

2920+
/**
2921+
* Finish this flow with delegation to other {@link IntegrationFlow} instance.
2922+
* @param other the {@link IntegrationFlow} to compose with.
2923+
* @return The {@link IntegrationFlow} instance based on this definition.
2924+
* @since 5.5.4
2925+
*/
2926+
public IntegrationFlow to(IntegrationFlow other) {
2927+
MessageChannel otherFlowInputChannel = obtainInputChannelFromFlow(other);
2928+
return channel(otherFlowInputChannel)
2929+
.get();
2930+
}
2931+
29202932
/**
29212933
* Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
29222934
* @param <T> the expected {@code payload} type

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.util.Map;
20+
1921
import org.springframework.messaging.MessageChannel;
2022

2123
/**
@@ -92,4 +94,13 @@ default MessageChannel getInputChannel() {
9294
return null;
9395
}
9496

97+
/**
98+
* Return a map of integration components managed by this flow (if any).
99+
* @return the map of integration components managed by this flow.
100+
* @since 5.5.4
101+
*/
102+
default Map<Object, String> getIntegrationComponents() {
103+
return null;
104+
}
105+
95106
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowAdapter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.util.Map;
1920
import java.util.concurrent.atomic.AtomicBoolean;
2021
import java.util.function.Consumer;
2122
import java.util.function.Supplier;
@@ -79,6 +80,10 @@ public MessageChannel getInputChannel() {
7980
return this.targetIntegrationFlow.getInputChannel();
8081
}
8182

83+
@Override public Map<Object, String> getIntegrationComponents() {
84+
return this.targetIntegrationFlow.getIntegrationComponents();
85+
}
86+
8287
@Override
8388
public void start() {
8489
assertTargetIntegrationFlow();

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,29 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.util.Map;
1920
import java.util.function.Consumer;
2021
import java.util.function.Supplier;
2122

2223
import org.reactivestreams.Publisher;
2324

25+
import org.springframework.aop.framework.Advised;
26+
import org.springframework.beans.factory.BeanCreationException;
2427
import org.springframework.integration.channel.DirectChannel;
2528
import org.springframework.integration.channel.FluxMessageChannel;
29+
import org.springframework.integration.channel.PublishSubscribeChannel;
30+
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
2631
import org.springframework.integration.core.MessageSource;
2732
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
2833
import org.springframework.integration.dsl.support.MessageChannelReference;
2934
import org.springframework.integration.endpoint.AbstractMessageSource;
3035
import org.springframework.integration.endpoint.MessageProducerSupport;
3136
import org.springframework.integration.gateway.MessagingGatewaySupport;
37+
import org.springframework.integration.handler.AbstractMessageProducingHandler;
3238
import org.springframework.lang.Nullable;
3339
import org.springframework.messaging.Message;
3440
import org.springframework.messaging.MessageChannel;
41+
import org.springframework.messaging.MessageHandler;
3542
import org.springframework.util.Assert;
3643

3744
/**
@@ -328,12 +335,52 @@ public static IntegrationFlowBuilder from(Class<?> serviceInterface,
328335
* @param publisher the {@link Publisher} to subscribe to.
329336
* @return new {@link IntegrationFlowBuilder}.
330337
*/
338+
@SuppressWarnings("overloads")
331339
public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {
332340
FluxMessageChannel reactiveChannel = new FluxMessageChannel();
333341
reactiveChannel.subscribeTo(publisher);
334342
return from((MessageChannel) reactiveChannel);
335343
}
336344

345+
/**
346+
* Start the flow with a composition from the {@link IntegrationFlow}.
347+
* @param other the {@link IntegrationFlow} from which to compose.
348+
* @return new {@link IntegrationFlowBuilder}.
349+
* @since 5.5.4
350+
*/
351+
@SuppressWarnings("overloads")
352+
public static IntegrationFlowBuilder from(IntegrationFlow other) {
353+
Map<Object, String> integrationComponents = other.getIntegrationComponents();
354+
Assert.notNull(integrationComponents, () ->
355+
"The provided integration flow to compose from '" + other +
356+
"' must be declared as a bean in the application context");
357+
Object lastIntegrationComponentFromOther =
358+
integrationComponents.keySet().stream().reduce((prev, next) -> next).orElse(null);
359+
if (lastIntegrationComponentFromOther instanceof MessageChannel) {
360+
return from((MessageChannel) lastIntegrationComponentFromOther);
361+
}
362+
else if (lastIntegrationComponentFromOther instanceof ConsumerEndpointFactoryBean) {
363+
MessageHandler handler = ((ConsumerEndpointFactoryBean) lastIntegrationComponentFromOther).getHandler();
364+
handler = extractProxyTarget(handler);
365+
if (handler instanceof AbstractMessageProducingHandler) {
366+
return buildFlowFromOutputChannel((AbstractMessageProducingHandler) handler);
367+
}
368+
lastIntegrationComponentFromOther = handler; // for the exception message below
369+
}
370+
throw new BeanCreationException("The 'IntegrationFlow' to start from must end with " +
371+
"a 'MessageChannel' or reply-producing endpoint to let the result from that flow to be " +
372+
"processed in this instance. The provided flow ends with: " + lastIntegrationComponentFromOther);
373+
}
374+
375+
private static IntegrationFlowBuilder buildFlowFromOutputChannel(AbstractMessageProducingHandler handler) {
376+
MessageChannel outputChannel = handler.getOutputChannel();
377+
if (outputChannel == null) {
378+
outputChannel = new PublishSubscribeChannel();
379+
handler.setOutputChannel(outputChannel);
380+
}
381+
return from(outputChannel);
382+
}
383+
337384
private static IntegrationFlowBuilder from(MessagingGatewaySupport inboundGateway,
338385
@Nullable IntegrationFlowBuilder integrationFlowBuilderArg) {
339386

@@ -360,6 +407,20 @@ private static IntegrationFlowBuilder registerComponents(Object spec) {
360407
return null;
361408
}
362409

410+
@SuppressWarnings("unchecked")
411+
private static <T> T extractProxyTarget(T target) {
412+
if (!(target instanceof Advised)) {
413+
return target;
414+
}
415+
Advised advised = (Advised) target;
416+
try {
417+
return (T) extractProxyTarget(advised.getTargetSource().getTarget());
418+
}
419+
catch (Exception e) {
420+
throw new BeanCreationException("Could not extract target", e);
421+
}
422+
}
423+
363424
private IntegrationFlows() {
364425
}
365426

spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -321,6 +321,7 @@ private Object processIntegrationFlowImpl(IntegrationFlow flow, String beanName)
321321
new NameMatchMethodPointcutAdvisor(new IntegrationFlowLifecycleAdvice(target));
322322
integrationFlowAdvice.setMappedNames(
323323
"getInputChannel",
324+
"getIntegrationComponents",
324325
"start",
325326
"stop",
326327
"isRunning",

spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowLifecycleAdvice.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -75,6 +75,12 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
7575
result = this.delegate.getInputChannel();
7676
}
7777
}
78+
else if ("getIntegrationComponents".equals(method)) {
79+
result = invocation.proceed();
80+
if (result == null) {
81+
result = this.delegate.getIntegrationComponents();
82+
}
83+
}
7884
else {
7985
if (target instanceof SmartLifecycle) {
8086
result = invocation.proceed();

0 commit comments

Comments
 (0)