diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java index 094c2f010f7..1209d3b6186 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,13 +43,13 @@ public abstract class AmqpInboundChannelAdapterSpec protected final MessageListenerContainerSpec listenerContainerSpec; // NOSONAR final protected AmqpInboundChannelAdapterSpec(MessageListenerContainerSpec listenerContainerSpec) { - super(new AmqpInboundChannelAdapter(listenerContainerSpec.get())); + super(new AmqpInboundChannelAdapter(listenerContainerSpec.getObject())); this.listenerContainerSpec = listenerContainerSpec; } @Override public Map getComponentsToRegister() { - return Collections.singletonMap(this.listenerContainerSpec.get(), this.listenerContainerSpec.getId()); + return Collections.singletonMap(this.listenerContainerSpec.getObject(), this.listenerContainerSpec.getId()); } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundGatewaySpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundGatewaySpec.java index 52e611a9dac..ec66787129e 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundGatewaySpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundGatewaySpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ public abstract class AmqpInboundGatewaySpec protected final AbstractMessageListenerContainerSpec listenerContainerSpec; // NOSONAR final protected AmqpInboundGatewaySpec(AbstractMessageListenerContainerSpec listenerContainerSpec) { - super(new AmqpInboundGateway(listenerContainerSpec.get())); + super(new AmqpInboundGateway(listenerContainerSpec.getObject())); this.listenerContainerSpec = listenerContainerSpec; } @@ -53,16 +53,16 @@ protected AmqpInboundGatewaySpec(AbstractMessageListenerContainerSpec list * @param listenerContainerSpec the {@link AbstractMessageListenerContainerSpec} to use. * @param amqpTemplate the {@link AmqpTemplate} to use. */ - AmqpInboundGatewaySpec( - AbstractMessageListenerContainerSpec listenerContainerSpec, + AmqpInboundGatewaySpec(AbstractMessageListenerContainerSpec listenerContainerSpec, AmqpTemplate amqpTemplate) { - super(new AmqpInboundGateway(listenerContainerSpec.get(), amqpTemplate)); + + super(new AmqpInboundGateway(listenerContainerSpec.getObject(), amqpTemplate)); this.listenerContainerSpec = listenerContainerSpec; } @Override public Map getComponentsToRegister() { - return Collections.singletonMap(this.listenerContainerSpec.get(), this.listenerContainerSpec.getId()); + return Collections.singletonMap(this.listenerContainerSpec.getObject(), this.listenerContainerSpec.getId()); } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java index fde1aef5212..9f703c07d8a 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/RabbitStreamMessageHandlerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -122,7 +122,7 @@ public RabbitStreamMessageHandlerSpec sendFailureChannel(String channel) { * Set to true to wait for a confirmation. * @param sync true to wait. * @return this spec. - * @see #setConfirmTimeout(long) + * @see #confirmTimeout(long) */ public RabbitStreamMessageHandlerSpec sync(boolean sync) { this.target.setSync(sync); diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java index 1a4ca0cf817..3b457d338d4 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.amqp.channel.AbstractAmqpChannel; +import org.springframework.integration.amqp.channel.PollableAmqpChannel; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode; import org.springframework.integration.amqp.inbound.AmqpInboundGateway; import org.springframework.integration.amqp.support.AmqpHeaderMapper; @@ -472,15 +473,16 @@ public IntegrationFlow amqpAsyncOutboundFlow(AsyncRabbitTemplate asyncRabbitTemp } @Bean - public AbstractAmqpChannel unitChannel(ConnectionFactory rabbitConnectionFactory) { + public AmqpPollableMessageChannelSpec unitChannel( + ConnectionFactory rabbitConnectionFactory) { + return Amqp.pollableChannel(rabbitConnectionFactory) .queueName("si.dsl.test") .channelTransacted(true) .extractPayload(true) .inboundHeaderMapper(mapperIn()) .outboundHeaderMapper(mapperOut()) - .defaultDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) - .get(); + .defaultDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); } @Bean diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java index 0088383b00e..3580fbae9a8 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ /** * @author Gary Russell * @author Chris Bono + * @author Artem Bilan + * * @since 6.0 */ public class RabbitStreamMessageHandlerTests implements RabbitTestContainer { @@ -56,7 +58,7 @@ void convertAndSend() throws InterruptedException { RabbitStreamMessageHandler handler = RabbitStream.outboundStreamAdapter(streamTemplate) .sync(true) - .get(); + .getObject(); handler.handleMessage(MessageBuilder.withPayload("foo") .setHeader("bar", "baz") diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index f004c2a0a85..e0f624a6c16 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -233,7 +232,7 @@ public B fixedSubscriberChannel(String messageChannelName) { * at the current {@link IntegrationFlow} chain position. * The provided {@code messageChannelName} is used for the bean registration * ({@link org.springframework.integration.channel.DirectChannel}), if there is no such a bean - * in the application context. Otherwise the existing {@link MessageChannel} bean is used + * in the application context. Otherwise, the existing {@link MessageChannel} bean is used * to wire integration endpoints. * @param messageChannelName the bean name to use. * @return the current {@link BaseIntegrationFlowDefinition}. @@ -252,7 +251,7 @@ public B channel(String messageChannelName) { */ public B channel(MessageChannelSpec messageChannelSpec) { Assert.notNull(messageChannelSpec, "'messageChannelSpec' must not be null"); - return channel(messageChannelSpec.get()); + return channel(messageChannelSpec.getObject()); } /** @@ -367,7 +366,7 @@ public B wireTap(IntegrationFlow flow) { * } * * This method can be used after any {@link #channel} for explicit {@link MessageChannel}, - * but with the caution do not impact existing {@link org.springframework.messaging.support.ChannelInterceptor}s. + * but with the caution do not impact existing {@link ChannelInterceptor}s. * @param wireTapChannel the {@link MessageChannel} bean name to wire-tap. * @return the current {@link BaseIntegrationFlowDefinition}. */ @@ -377,8 +376,7 @@ public B wireTap(String wireTapChannel) { /** * Populate the {@code Wire Tap} EI Pattern specific - * {@link org.springframework.messaging.support.ChannelInterceptor} implementation - * to the current {@link #currentMessageChannel}. + * {@link ChannelInterceptor} implementation to the current {@link #currentMessageChannel}. * It is useful when an implicit {@link MessageChannel} is used between endpoints: *
 	 * {@code
@@ -388,7 +386,7 @@ public B wireTap(String wireTapChannel) {
 	 * }
 	 * 
* This method can be used after any {@link #channel} for explicit {@link MessageChannel}, - * but with the caution do not impact existing {@link org.springframework.messaging.support.ChannelInterceptor}s. + * but with the caution do not impact existing {@link ChannelInterceptor}s. * @param wireTapChannel the {@link MessageChannel} to wire-tap. * @return the current {@link BaseIntegrationFlowDefinition}. */ @@ -398,8 +396,7 @@ public B wireTap(MessageChannel wireTapChannel) { /** * Populate the {@code Wire Tap} EI Pattern specific - * {@link org.springframework.messaging.support.ChannelInterceptor} implementation - * to the current {@link #currentMessageChannel}. + * {@link ChannelInterceptor} implementation to the current {@link #currentMessageChannel}. * It is useful when an implicit {@link MessageChannel} is used between endpoints: *
 	 * {@code
@@ -409,7 +406,7 @@ public B wireTap(MessageChannel wireTapChannel) {
 	 * }
 	 * 
* This method can be used after any {@link #channel} for explicit {@link MessageChannel}, - * but with the caution do not impact existing {@link org.springframework.messaging.support.ChannelInterceptor}s. + * but with the caution do not impact existing {@link ChannelInterceptor}s. * @param flow the {@link IntegrationFlow} for wire-tap subflow as an alternative to the {@code wireTapChannel}. * @param wireTapConfigurer the {@link Consumer} to accept options for the {@link WireTap}. * @return the current {@link BaseIntegrationFlowDefinition}. @@ -438,8 +435,7 @@ protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) { /** * Populate the {@code Wire Tap} EI Pattern specific - * {@link org.springframework.messaging.support.ChannelInterceptor} implementation - * to the current {@link #currentMessageChannel}. + * {@link ChannelInterceptor} implementation to the current {@link #currentMessageChannel}. * It is useful when an implicit {@link MessageChannel} is used between endpoints: *
 	 * {@code
@@ -449,7 +445,7 @@ protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) {
 	 * }
 	 * 
* This method can be used after any {@link #channel} for explicit {@link MessageChannel}, - * but with the caution do not impact existing {@link org.springframework.messaging.support.ChannelInterceptor}s. + * but with the caution do not impact existing {@link ChannelInterceptor}s. * @param wireTapChannel the {@link MessageChannel} bean name to wire-tap. * @param wireTapConfigurer the {@link Consumer} to accept options for the {@link WireTap}. * @return the current {@link BaseIntegrationFlowDefinition}. @@ -462,8 +458,7 @@ public B wireTap(String wireTapChannel, Consumer wireTapConfigurer) /** * Populate the {@code Wire Tap} EI Pattern specific - * {@link org.springframework.messaging.support.ChannelInterceptor} implementation - * to the current {@link #currentMessageChannel}. + * {@link ChannelInterceptor} implementation to the current {@link #currentMessageChannel}. * It is useful when an implicit {@link MessageChannel} is used between endpoints: *
 	 * {@code
@@ -473,7 +468,7 @@ public B wireTap(String wireTapChannel, Consumer wireTapConfigurer)
 	 * }
 	 * 
* This method can be used after any {@link #channel} for explicit {@link MessageChannel}, - * but with the caution do not impact existing {@link org.springframework.messaging.support.ChannelInterceptor}s. + * but with the caution do not impact existing {@link ChannelInterceptor}s. * @param wireTapChannel the {@link MessageChannel} to wire-tap. * @param wireTapConfigurer the {@link Consumer} to accept options for the {@link WireTap}. * @return the current {@link BaseIntegrationFlowDefinition}. @@ -489,8 +484,7 @@ public B wireTap(MessageChannel wireTapChannel, Consumer wireTapCon /** * Populate the {@code Wire Tap} EI Pattern specific - * {@link org.springframework.messaging.support.ChannelInterceptor} implementation - * to the current {@link #currentMessageChannel}. + * {@link ChannelInterceptor} implementation to the current {@link #currentMessageChannel}. *

It is useful when an implicit {@link MessageChannel} is used between endpoints: *

 	 * {@code
@@ -500,14 +494,16 @@ public B wireTap(MessageChannel wireTapChannel, Consumer wireTapCon
 	 * }
 	 * 
* This method can be used after any {@link #channel} for explicit {@link MessageChannel}, - * but with the caution do not impact existing {@link org.springframework.messaging.support.ChannelInterceptor}s. + * but with the caution do not impact existing {@link ChannelInterceptor}s. * @param wireTapSpec the {@link WireTapSpec} to use. - *

When this EIP-method is used in the end of flow, it appends {@code nullChannel} to terminate flow properly, - * Otherwise {@code Dispatcher has no subscribers} exception is thrown for implicit {@link DirectChannel}. + *

When this EIP-method is used in the end of flow, + * it appends a {@code nullChannel} to terminate flow properly, + * Otherwise a {@code Dispatcher has no subscribers} exception + * is thrown for implicit {@link DirectChannel}. * @return the current {@link BaseIntegrationFlowDefinition}. */ public B wireTap(WireTapSpec wireTapSpec) { - WireTap interceptor = wireTapSpec.get(); + WireTap interceptor = wireTapSpec.getObject(); InterceptableChannel currentChannel = currentInterceptableChannel(); addComponent(wireTapSpec); currentChannel.addInterceptor(interceptor); @@ -613,7 +609,7 @@ public B transform(Object service, String methodName, /** * Populate the {@link MessageTransformingHandler} instance for the - * {@link org.springframework.integration.handler.MessageProcessor} from provided {@link MessageProcessorSpec}. + * {@link MessageProcessor} from provided {@link MessageProcessorSpec}. *

 	 * {@code
 	 *  .transform(Scripts.script("classpath:myScript.py").variable("foo", bar()))
@@ -629,7 +625,7 @@ public B transform(MessageProcessorSpec messageProcessorSpec) {
 
 	/**
 	 * Populate the {@link MessageTransformingHandler} instance for the
-	 * {@link org.springframework.integration.handler.MessageProcessor} from provided {@link MessageProcessorSpec}.
+	 * {@link MessageProcessor} from provided {@link MessageProcessorSpec}.
 	 * In addition, accept options for the integration endpoint using {@link GenericEndpointSpec}.
 	 * 
 	 * {@code
@@ -646,7 +642,7 @@ public B transform(MessageProcessorSpec messageProcessorSpec,
 			Consumer> endpointConfigurer) {
 
 		Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
-		MessageProcessor processor = messageProcessorSpec.get();
+		MessageProcessor processor = messageProcessorSpec.getObject();
 		return addComponent(processor)
 				.transform(null, new MethodInvokingTransformer(processor), endpointConfigurer);
 	}
@@ -832,7 +828,7 @@ public B filter(MessageProcessorSpec messageProcessorSpec) {
 	 */
 	public B filter(MessageProcessorSpec messageProcessorSpec, Consumer endpointConfigurer) {
 		Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
-		MessageProcessor processor = messageProcessorSpec.get();
+		MessageProcessor processor = messageProcessorSpec.getObject();
 		return addComponent(processor)
 				.filter(null, new MethodInvokingSelector(processor), endpointConfigurer);
 	}
@@ -1089,7 +1085,7 @@ public B handle(MessageProcessorSpec messageProcessorSpec,
 			Consumer> endpointConfigurer) {
 
 		Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
-		MessageProcessor processor = messageProcessorSpec.get();
+		MessageProcessor processor = messageProcessorSpec.getObject();
 		return addComponent(processor)
 				.handle(new ServiceActivatingHandler(processor), endpointConfigurer);
 	}
@@ -1118,7 +1114,7 @@ public  B handle(MessageHandlerSpec messageHandl
 		if (messageHandlerSpec instanceof ComponentsRegistration) {
 			addComponents(((ComponentsRegistration) messageHandlerSpec).getComponentsToRegister());
 		}
-		return handle(messageHandlerSpec.get(), endpointConfigurer);
+		return handle(messageHandlerSpec.getObject(), endpointConfigurer);
 	}
 
 	/**
@@ -1287,7 +1283,7 @@ public B enrichHeaders(Map headers,
 
 		HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec();
 		headerEnricherSpec.headers(headers);
-		Tuple2 tuple2 = headerEnricherSpec.get();
+		Tuple2 tuple2 = headerEnricherSpec.getObject();
 		return addComponents(headerEnricherSpec.getComponentsToRegister())
 				.handle(tuple2.getT2(), endpointConfigurer);
 	}
@@ -1478,7 +1474,7 @@ public B split(MessageProcessorSpec messageProcessorSpec,
 			Consumer> endpointConfigurer) {
 
 		Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
-		MessageProcessor processor = messageProcessorSpec.get();
+		MessageProcessor processor = messageProcessorSpec.getObject();
 		return addComponent(processor)
 				.split(new MethodInvokingSplitter(processor), endpointConfigurer);
 	}
@@ -1569,7 +1565,7 @@ public  B split(MessageHandlerSpec spli
 	public  B split(MessageHandlerSpec splitterMessageHandlerSpec,
 			Consumer> endpointConfigurer) {
 		Assert.notNull(splitterMessageHandlerSpec, "'splitterMessageHandlerSpec' must not be null");
-		return split(splitterMessageHandlerSpec.get(), endpointConfigurer);
+		return split(splitterMessageHandlerSpec.getObject(), endpointConfigurer);
 	}
 
 	/**
@@ -1960,7 +1956,7 @@ public B route(MessageProcessorSpec messageProcessorSpec,
 			Consumer> routerConfigurer) {
 
 		Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
-		MessageProcessor processor = messageProcessorSpec.get();
+		MessageProcessor processor = messageProcessorSpec.getObject();
 		addComponent(processor);
 
 		return route(new RouterSpec<>(new MethodInvokingRouter(processor)), routerConfigurer);
@@ -2713,7 +2709,7 @@ public B scatterGather(MessageChannel scatterChannel, Consumer g
 		if (gatherer != null) {
 			gatherer.accept(aggregatorSpec);
 		}
-		AggregatingMessageHandler aggregatingMessageHandler = aggregatorSpec.get().getT2();
+		AggregatingMessageHandler aggregatingMessageHandler = aggregatorSpec.getObject().getT2();
 		addComponent(aggregatingMessageHandler);
 		ScatterGatherHandler messageHandler = new ScatterGatherHandler(scatterChannel, aggregatingMessageHandler);
 		return register(new ScatterGatherSpec(messageHandler), scatterGather);
@@ -2766,10 +2762,10 @@ public B scatterGather(Consumer scatterer, @Nullable Co
 		if (gatherer != null) {
 			gatherer.accept(aggregatorSpec);
 		}
-		RecipientListRouter recipientListRouter = recipientListRouterSpec.get().getT2();
+		RecipientListRouter recipientListRouter = recipientListRouterSpec.getObject().getT2();
 		addComponent(recipientListRouter)
 				.addComponents(recipientListRouterSpec.getComponentsToRegister());
-		AggregatingMessageHandler aggregatingMessageHandler = aggregatorSpec.get().getT2();
+		AggregatingMessageHandler aggregatingMessageHandler = aggregatorSpec.getObject().getT2();
 		addComponent(aggregatingMessageHandler);
 		ScatterGatherHandler messageHandler = new ScatterGatherHandler(recipientListRouter, aggregatingMessageHandler);
 		return register(new ScatterGatherSpec(messageHandler), scatterGather);
@@ -2974,16 +2970,16 @@ protected  Publisher> toReactivePublisher(boolean autoStartOnSubsc
 			this.registerOutputChannelIfCan(inputChannel);
 		}
 
-		Tuple2 factoryBeanTuple2 = endpointSpec.get();
+		Tuple2 factoryBeanTuple2 = endpointSpec.getObject();
 
 		addComponents(endpointSpec.getComponentsToRegister());
 
-		if (inputChannel instanceof MessageChannelReference) {
-			factoryBeanTuple2.getT1().setInputChannelName(((MessageChannelReference) inputChannel).getName());
+		if (inputChannel instanceof MessageChannelReference messageChannelReference) {
+			factoryBeanTuple2.getT1().setInputChannelName(messageChannelReference.getName());
 		}
 		else {
-			if (inputChannel instanceof FixedSubscriberChannelPrototype) {
-				String beanName = ((FixedSubscriberChannelPrototype) inputChannel).getName();
+			if (inputChannel instanceof FixedSubscriberChannelPrototype fixedSubscriberChannel) {
+				String beanName = fixedSubscriberChannel.getName();
 				inputChannel = new FixedSubscriberChannel(factoryBeanTuple2.getT2());
 				if (beanName != null) {
 					((FixedSubscriberChannel) inputChannel).setBeanName(beanName);
@@ -3002,8 +2998,8 @@ protected B registerOutputChannelIfCan(MessageChannel outputChannel) {
 			Object currComponent = getCurrentComponent();
 			if (currComponent != null) {
 				String channelName = null;
-				if (outputChannel instanceof MessageChannelReference) {
-					channelName = ((MessageChannelReference) outputChannel).getName();
+				if (outputChannel instanceof MessageChannelReference channelReference) {
+					channelName = channelReference.getName();
 				}
 
 				if (currComponent instanceof MessageProducer messageProducer) {
@@ -3015,9 +3011,9 @@ protected B registerOutputChannelIfCan(MessageChannel outputChannel) {
 						messageProducer.setOutputChannel(outputChannel);
 					}
 				}
-				else if (currComponent instanceof SourcePollingChannelAdapterSpec) {
+				else if (currComponent instanceof SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
 					SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean =
-							((SourcePollingChannelAdapterSpec) currComponent).get().getT1();
+							sourcePollingChannelAdapterSpec.getObject().getT1();
 					if (channelName != null) {
 						pollingChannelAdapterFactoryBean.setOutputChannelName(channelName);
 					}
@@ -3081,13 +3077,11 @@ else if (currentChannel != null) {
 			}
 
 			if (isImplicitChannel()) {
-				Optional lastComponent =
-						components.keySet()
-								.stream()
-								.reduce((first, second) -> second);
-				if (lastComponent.get() instanceof WireTapSpec) {
-					bridge();
-				}
+				components.keySet()
+						.stream()
+						.reduce((first, second) -> second)
+						.filter(WireTapSpec.class::isInstance)
+						.ifPresent((wireTap) -> bridge());
 			}
 
 			this.integrationFlow = new StandardIntegrationFlow(components);
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/EndpointSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/EndpointSpec.java
index b1253dd8d83..ca86c24a068 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/EndpointSpec.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/EndpointSpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -86,7 +86,7 @@ public S poller(PollerSpec pollerMetadataSpec) {
 		if (components != null) {
 			this.componentsToRegister.putAll(components);
 		}
-		return poller(pollerMetadataSpec.get());
+		return poller(pollerMetadataSpec.getObject());
 	}
 
 	/**
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationComponentSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationComponentSpec.java
index 9ca2ade6ad7..5337dca13fe 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationComponentSpec.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationComponentSpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,15 +16,21 @@
 
 package org.springframework.integration.dsl;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.springframework.beans.factory.BeanInitializationException;
 import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.FactoryBean;
 import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.config.AbstractFactoryBean;
 import org.springframework.context.Lifecycle;
 import org.springframework.context.SmartLifecycle;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
 
 /**
- * The common Builder abstraction. The {@link #get()} method returns the final component.
+ * The common Builder abstraction.
+ * If used as a bean definition, must be treated as an {@link FactoryBean},
+ * therefore its {@link #getObject()} method must not be called in the target configuration.
  *
  * @param  the target {@link IntegrationComponentSpec} implementation type.
  * @param  the target type.
@@ -35,11 +41,12 @@
  */
 @IntegrationDsl
 public abstract class IntegrationComponentSpec, T>
-		extends AbstractFactoryBean
-		implements SmartLifecycle {
+		implements FactoryBean, InitializingBean, DisposableBean, SmartLifecycle {
 
 	protected static final SpelExpressionParser PARSER = new SpelExpressionParser();
 
+	protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR - final
+
 	protected volatile T target; // NOSONAR
 
 	private String id;
@@ -61,78 +68,83 @@ public final String getId() {
 
 	/**
 	 * @return the configured component.
+	 * @deprecated since 6.1 with no-op for end-user:
+	 * the {@link #getObject()} is called by the framework at the appropriate phase.
 	 */
+	@Deprecated(since = "6.1", forRemoval = true)
 	public T get() {
-		if (this.target == null) {
-			this.target = doGet();
-		}
-		return this.target;
+		return getObject();
 	}
 
 	@Override
 	public Class getObjectType() {
-		return get().getClass();
+		return getObject().getClass();
 	}
 
+	/**
+	 * !!! This method must not be called from the target configuration !!!
+	 * @return the object backed by this factory bean.
+	 */
 	@Override
-	protected T createInstance() {
-		T instance = get();
-		if (instance instanceof InitializingBean) {
-			try {
-				((InitializingBean) instance).afterPropertiesSet();
-			}
-			catch (Exception e) {
-				throw new IllegalStateException("Cannot initialize bean: " + instance, e);
+	public T getObject() {
+		if (this.target == null) {
+			this.target = doGet();
+		}
+		return this.target;
+	}
+
+	@Override
+	public void afterPropertiesSet() {
+		try {
+			if (this.target instanceof InitializingBean initializingBean) {
+				initializingBean.afterPropertiesSet();
 			}
 		}
-		return instance;
+		catch (Exception ex) {
+			throw new BeanInitializationException("Cannot initialize bean: " + this.target, ex);
+		}
 	}
 
 	@Override
-	protected void destroyInstance(T instance) {
-		if (instance instanceof DisposableBean) {
+	public void destroy() {
+		if (this.target instanceof DisposableBean disposableBean) {
 			try {
-				((DisposableBean) instance).destroy();
+				disposableBean.destroy();
 			}
 			catch (Exception e) {
-				throw new IllegalStateException("Cannot destroy bean: " + instance, e);
+				throw new IllegalStateException("Cannot destroy bean: " + this.target, e);
 			}
 		}
 	}
 
 	@Override
 	public void start() {
-		T instance = get();
-		if (instance instanceof Lifecycle) {
-			((Lifecycle) instance).start();
+		if (this.target instanceof Lifecycle lifecycle) {
+			lifecycle.start();
 		}
 	}
 
 	@Override
 	public void stop() {
-		T instance = get();
-		if (instance instanceof Lifecycle) {
-			((Lifecycle) instance).stop();
+		if (this.target instanceof Lifecycle lifecycle) {
+			lifecycle.stop();
 		}
 	}
 
 	@Override
 	public boolean isRunning() {
-		T instance = get();
-		return !(instance instanceof Lifecycle) || ((Lifecycle) instance).isRunning();
+		return !(this.target instanceof Lifecycle lifecycle) || lifecycle.isRunning();
 	}
 
 	@Override
 	public boolean isAutoStartup() {
-		T instance = get();
-		return instance instanceof SmartLifecycle && ((SmartLifecycle) instance).isAutoStartup();
+		return this.target instanceof SmartLifecycle lifecycle && lifecycle.isAutoStartup();
 	}
 
 	@Override
 	public void stop(Runnable callback) {
-		T instance = get();
-		if (instance instanceof SmartLifecycle) {
-			((SmartLifecycle) instance).stop(callback);
+		if (this.target instanceof SmartLifecycle lifecycle) {
+			lifecycle.stop(callback);
 		}
 		else {
 			callback.run();
@@ -141,9 +153,8 @@ public void stop(Runnable callback) {
 
 	@Override
 	public int getPhase() {
-		T instance = get();
-		if (instance instanceof SmartLifecycle) {
-			return ((SmartLifecycle) instance).getPhase();
+		if (this.target instanceof SmartLifecycle lifecycle) {
+			return lifecycle.getPhase();
 		}
 		else {
 			return 0;
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java
index 96c076dcc27..f776d82d410 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -151,7 +151,7 @@ static IntegrationFlowBuilder from(String messageChannelName) {
 	 */
 	static IntegrationFlowBuilder from(MessageChannelSpec messageChannelSpec) {
 		Assert.notNull(messageChannelSpec, "'messageChannelSpec' must not be null");
-		return from(messageChannelSpec.get());
+		return from(messageChannelSpec.getObject());
 	}
 
 	/**
@@ -217,7 +217,7 @@ static IntegrationFlowBuilder from(MessageSourceSpec endpointConfigurer) {
 
 		Assert.notNull(messageSourceSpec, "'messageSourceSpec' must not be null");
-		return from(messageSourceSpec.get(), endpointConfigurer, registerComponents(messageSourceSpec));
+		return from(messageSourceSpec.getObject(), endpointConfigurer, registerComponents(messageSourceSpec));
 	}
 
 	/**
@@ -321,7 +321,7 @@ private static IntegrationFlowBuilder from(MessageSource messageSource,
 	 * @see MessageProducerSpec
 	 */
 	static IntegrationFlowBuilder from(MessageProducerSpec messageProducerSpec) {
-		return from(messageProducerSpec.get(), registerComponents(messageProducerSpec));
+		return from(messageProducerSpec.getObject(), registerComponents(messageProducerSpec));
 	}
 
 	/**
@@ -362,7 +362,7 @@ private static IntegrationFlowBuilder from(MessageProducerSupport messageProduce
 	 * @since 6.0
 	 */
 	static IntegrationFlowBuilder from(MessagingGatewaySpec inboundGatewaySpec) {
-		return from(inboundGatewaySpec.get(), registerComponents(inboundGatewaySpec));
+		return from(inboundGatewaySpec.getObject(), registerComponents(inboundGatewaySpec));
 	}
 
 	/**
@@ -445,14 +445,14 @@ static IntegrationFlowBuilder from(IntegrationFlow other) {
 						"' must be declared as a bean in the application context");
 		Object lastIntegrationComponentFromOther =
 				integrationComponents.keySet().stream().reduce((prev, next) -> next).orElse(null);
-		if (lastIntegrationComponentFromOther instanceof MessageChannel) {
-			return from((MessageChannel) lastIntegrationComponentFromOther);
+		if (lastIntegrationComponentFromOther instanceof MessageChannel messageChannel) {
+			return from(messageChannel);
 		}
-		else if (lastIntegrationComponentFromOther instanceof ConsumerEndpointFactoryBean) {
-			MessageHandler handler = ((ConsumerEndpointFactoryBean) lastIntegrationComponentFromOther).getHandler();
+		else if (lastIntegrationComponentFromOther instanceof ConsumerEndpointFactoryBean factoryBean) {
+			MessageHandler handler = factoryBean.getHandler();
 			handler = extractProxyTarget(handler);
-			if (handler instanceof AbstractMessageProducingHandler) {
-				return buildFlowFromOutputChannel((AbstractMessageProducingHandler) handler);
+			if (handler instanceof AbstractMessageProducingHandler producingHandler) {
+				return buildFlowFromOutputChannel(producingHandler);
 			}
 			lastIntegrationComponentFromOther = handler; // for the exception message below
 		}
@@ -489,9 +489,9 @@ private static IntegrationFlowBuilder from(MessagingGatewaySupport inboundGatewa
 	}
 
 	private static IntegrationFlowBuilder registerComponents(Object spec) {
-		if (spec instanceof ComponentsRegistration) {
+		if (spec instanceof ComponentsRegistration componentsRegistration) {
 			return new IntegrationFlowBuilder()
-					.addComponents(((ComponentsRegistration) spec).getComponentsToRegister());
+					.addComponents(componentsRegistration.getComponentsToRegister());
 		}
 		return null;
 	}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannelSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannelSpec.java
index 1f07ef2eefd..8be41af56c7 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannelSpec.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageChannelSpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2019 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -100,7 +100,7 @@ public S wireTap(MessageChannel wireTapChannel) {
 	 * @see WireTap
 	 */
 	public S wireTap(WireTapSpec wireTapSpec) {
-		WireTap interceptor = wireTapSpec.get();
+		WireTap interceptor = wireTapSpec.getObject();
 		this.componentsToRegister.put(interceptor, null);
 		return interceptor(interceptor);
 	}
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/DslIntegrationConfigurationInitializer.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/DslIntegrationConfigurationInitializer.java
index 7fc0e45b776..eba70d2c939 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/DslIntegrationConfigurationInitializer.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/DslIntegrationConfigurationInitializer.java
@@ -31,11 +31,12 @@
  * Registers {@link IntegrationFlowBeanPostProcessor} and checks if all
  * {@link org.springframework.integration.dsl.IntegrationComponentSpec} are extracted to
  * the target object using
- * {@link org.springframework.integration.dsl.IntegrationComponentSpec#get()}.
+ * {@link org.springframework.integration.dsl.IntegrationComponentSpec#getObject()}.
  *
  * @author Artem Bilan
  * @author Gary Russell
  * @author Chris Bono
+ *
  * @since 5.0
  *
  * @see org.springframework.integration.config.IntegrationConfigurationBeanFactoryPostProcessor
diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java
index 9b7322e3759..f180061be4d 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -113,14 +113,14 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
 
 	@Override
 	public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
-		if (bean instanceof StandardIntegrationFlow) {
-			return processStandardIntegrationFlow((StandardIntegrationFlow) bean, beanName);
+		if (bean instanceof StandardIntegrationFlow standardIntegrationFlow) {
+			return processStandardIntegrationFlow(standardIntegrationFlow, beanName);
 		}
-		else if (bean instanceof IntegrationFlow) {
-			return processIntegrationFlowImpl((IntegrationFlow) bean, beanName);
+		else if (bean instanceof IntegrationFlow integrationFlow) {
+			return processIntegrationFlowImpl(integrationFlow, beanName);
 		}
-		if (bean instanceof IntegrationComponentSpec) {
-			processIntegrationComponentSpec(beanName, (IntegrationComponentSpec) bean);
+		if (bean instanceof IntegrationComponentSpec integrationComponentSpec) {
+			processIntegrationComponentSpec(beanName, integrationComponentSpec);
 		}
 		return bean;
 	}
@@ -156,8 +156,8 @@ private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, Stri
 		for (Map.Entry entry : integrationComponents.entrySet()) {
 			Object component = entry.getKey();
 			if (component instanceof ConsumerEndpointSpec endpointSpec) {
-				MessageHandler messageHandler = endpointSpec.get().getT2();
-				ConsumerEndpointFactoryBean endpoint = endpointSpec.get().getT1();
+				MessageHandler messageHandler = endpointSpec.getObject().getT2();
+				ConsumerEndpointFactoryBean endpoint = endpointSpec.getObject().getT1();
 				String id = endpointSpec.getId();
 
 				if (id == null) {
@@ -177,8 +177,8 @@ else if (useFlowIdAsPrefix) {
 				registerComponent(endpoint, id, flowBeanName);
 				targetIntegrationComponents.put(endpoint, id);
 			}
-			else if (component instanceof MessageChannelReference) {
-				String channelBeanName = ((MessageChannelReference) component).getName();
+			else if (component instanceof MessageChannelReference messageChannelReference) {
+				String channelBeanName = messageChannelReference.getName();
 				if (!this.beanFactory.containsBean(channelBeanName)) {
 					DirectChannel directChannel = new DirectChannel();
 					registerComponent(directChannel, channelBeanName, flowBeanName);
@@ -196,7 +196,7 @@ else if (component instanceof SourcePollingChannelAdapterSpec spec) {
 											generateBeanName(o.getKey(), flowNamePrefix, o.getValue(),
 													useFlowIdAsPrefix)));
 				}
-				SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
+				SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.getObject().getT1();
 				String id = spec.getId();
 				if (id == null) {
 					id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue(),
@@ -209,12 +209,13 @@ else if (useFlowIdAsPrefix) {
 				registerComponent(pollingChannelAdapterFactoryBean, id, flowBeanName);
 				targetIntegrationComponents.put(pollingChannelAdapterFactoryBean, id);
 
-				MessageSource messageSource = spec.get().getT2();
+				MessageSource messageSource = spec.getObject().getT2();
 				if (noBeanPresentForComponent(messageSource, flowBeanName)) {
 					String messageSourceId = id + ".source";
-					if (messageSource instanceof NamedComponent
-							&& ((NamedComponent) messageSource).getComponentName() != null) {
-						messageSourceId = ((NamedComponent) messageSource).getComponentName();
+					if (messageSource instanceof NamedComponent namedComponent
+							&& namedComponent.getComponentName() != null) {
+
+						messageSourceId = namedComponent.getComponentName();
 					}
 					registerComponent(messageSource, messageSourceId, flowBeanName);
 				}
@@ -342,7 +343,7 @@ private Object processIntegrationFlowImpl(IntegrationFlow flow, String beanName)
 	}
 
 	private void processIntegrationComponentSpec(String beanName, IntegrationComponentSpec bean) {
-		Object target = bean.get();
+		Object target = bean.getObject();
 
 		invokeBeanInitializationHooks(beanName, target);
 
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java
index 6c4de15cab3..bed97230ba6 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -119,7 +119,7 @@ void testMessageChannelReactiveAdaptation() throws InterruptedException {
 
 	@Test
 	void testFluxMessageChannelCleanUp() throws InterruptedException {
-		FluxMessageChannel flux = MessageChannels.flux().get();
+		FluxMessageChannel flux = MessageChannels.flux().getObject();
 
 		CountDownLatch finishLatch = new CountDownLatch(1);
 
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/PollersTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/PollersTests.java
index e2fc767ddd2..089c93d70d6 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/PollersTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/PollersTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2019-2022 the original author or authors.
+ * Copyright 2019-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -35,19 +35,19 @@ public class PollersTests {
 
 	@Test
 	public void testDurations() {
-		PeriodicTrigger trigger = (PeriodicTrigger) Pollers.fixedDelay(Duration.ofMinutes(1L)).get().getTrigger();
+		PeriodicTrigger trigger = (PeriodicTrigger) Pollers.fixedDelay(Duration.ofMinutes(1L)).getObject().getTrigger();
 		assertThat(trigger.getPeriodDuration()).isEqualTo(Duration.ofSeconds(60));
 		assertThat(trigger.isFixedRate()).isFalse();
 		trigger = (PeriodicTrigger) Pollers.fixedDelay(Duration.ofMinutes(1L), Duration.ofSeconds(10L))
-				.get().getTrigger();
+				.getObject().getTrigger();
 		assertThat(trigger.getPeriodDuration()).isEqualTo(Duration.ofSeconds(60));
 		assertThat(trigger.getInitialDelayDuration()).isEqualTo(Duration.ofSeconds(10));
 		assertThat(trigger.isFixedRate()).isFalse();
-		trigger = (PeriodicTrigger) Pollers.fixedRate(Duration.ofMinutes(1L)).get().getTrigger();
+		trigger = (PeriodicTrigger) Pollers.fixedRate(Duration.ofMinutes(1L)).getObject().getTrigger();
 		assertThat(trigger.getPeriodDuration()).isEqualTo(Duration.ofSeconds(60));
 		assertThat(trigger.isFixedRate()).isTrue();
 		trigger = (PeriodicTrigger) Pollers.fixedRate(Duration.ofMinutes(1L), Duration.ofSeconds(10L))
-				.get().getTrigger();
+				.getObject().getTrigger();
 		assertThat(trigger.getPeriodDuration()).isEqualTo(Duration.ofSeconds(60));
 		assertThat(trigger.getInitialDelayDuration()).isEqualTo(Duration.ofSeconds(10));
 		assertThat(trigger.isFixedRate()).isTrue();
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/composition/IntegrationFlowCompositionTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/composition/IntegrationFlowCompositionTests.java
index 7496181d007..d3413cacf1a 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/composition/IntegrationFlowCompositionTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/composition/IntegrationFlowCompositionTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2021-2022 the original author or authors.
+ * Copyright 2021-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
 import org.springframework.integration.channel.QueueChannel;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.PollerSpec;
 import org.springframework.integration.dsl.Pollers;
 import org.springframework.integration.dsl.context.IntegrationFlowContext;
 import org.springframework.integration.scheduling.PollerMetadata;
@@ -142,8 +143,8 @@ void testInvalidStartFlowForComposition() {
 	public static class ContextConfiguration {
 
 		@Bean(PollerMetadata.DEFAULT_POLLER)
-		PollerMetadata defaultPoller() {
-			return Pollers.fixedDelay(100).get();
+		PollerSpec defaultPoller() {
+			return Pollers.fixedDelay(100);
 		}
 
 		@Bean
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java
index 47521b3c5d3..c8c79f2df52 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -54,13 +54,16 @@
 import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.channel.FixedSubscriberChannel;
 import org.springframework.integration.channel.NullChannel;
+import org.springframework.integration.channel.PublishSubscribeChannel;
 import org.springframework.integration.channel.QueueChannel;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.context.IntegrationContextUtils;
 import org.springframework.integration.core.GenericTransformer;
 import org.springframework.integration.dsl.IntegrationFlow;
 import org.springframework.integration.dsl.MessageChannels;
+import org.springframework.integration.dsl.PollerSpec;
 import org.springframework.integration.dsl.Pollers;
+import org.springframework.integration.dsl.QueueChannelSpec;
 import org.springframework.integration.dsl.Transformers;
 import org.springframework.integration.endpoint.AbstractEndpoint;
 import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -575,8 +578,8 @@ public IntegrationFlow supplierFlow() {
 		}
 
 		@Bean(name = PollerMetadata.DEFAULT_POLLER)
-		public PollerMetadata poller() {
-			return Pollers.fixedRate(100).get();
+		public PollerSpec poller() {
+			return Pollers.fixedRate(100);
 		}
 
 		@Bean(name = IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME)
@@ -588,8 +591,8 @@ public TaskScheduler taskScheduler() {
 
 
 		@Bean
-		public MessageChannel suppliedChannel() {
-			return MessageChannels.queue(10).get();
+		public QueueChannelSpec suppliedChannel() {
+			return MessageChannels.queue(10);
 		}
 
 	}
@@ -608,8 +611,8 @@ public IntegrationFlow supplierFlow2() {
 		}
 
 		@Bean
-		public MessageChannel suppliedChannel2() {
-			return MessageChannels.queue(10).get();
+		public QueueChannelSpec suppliedChannel2() {
+			return MessageChannels.queue(10);
 		}
 
 	}
@@ -627,12 +630,12 @@ public IntegrationFlow controlBusFlow() {
 
 		@Bean
 		public MessageChannel inputChannel() {
-			return MessageChannels.direct().get();
+			return new DirectChannel();
 		}
 
 		@Bean
 		public MessageChannel foo() {
-			return MessageChannels.publishSubscribe().get();
+			return new PublishSubscribeChannel();
 		}
 
 	}
@@ -682,7 +685,7 @@ public IntegrationFlow flow2() {
 
 		@Bean
 		public MessageChannel publishSubscribeChannel() {
-			return MessageChannels.publishSubscribe().get();
+			return new PublishSubscribeChannel();
 		}
 
 		@Bean
@@ -784,8 +787,8 @@ public static class ContextConfiguration3 {
 		private MethodInterceptor delayedAdvice;
 
 		@Bean
-		public QueueChannel successChannel() {
-			return MessageChannels.queue().get();
+		public QueueChannelSpec successChannel() {
+			return MessageChannels.queue();
 		}
 
 		@Bean
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/gateway/GatewayDslTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/gateway/GatewayDslTests.java
index 63ce7c46fe4..08921d744c6 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/gateway/GatewayDslTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/gateway/GatewayDslTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2019-2022 the original author or authors.
+ * Copyright 2019-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -36,7 +36,6 @@
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.core.MessagingTemplate;
 import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.MessageChannels;
 import org.springframework.integration.gateway.GatewayProxyFactoryBean;
 import org.springframework.integration.gateway.MessagingGatewaySupport;
 import org.springframework.integration.gateway.MethodArgsHolder;
@@ -189,7 +188,7 @@ public IntegrationFlow gatewayRequestFlow() {
 
 		@Bean
 		public MessageChannel gatewayError() {
-			return MessageChannels.queue().get();
+			return new QueueChannel();
 		}
 
 
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java
index 36959091455..4bdaeb1401a 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java
@@ -30,12 +30,12 @@
 import org.springframework.context.annotation.Configuration;
 import org.springframework.integration.MessageRejectedException;
 import org.springframework.integration.annotation.Transformer;
+import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.channel.FixedSubscriberChannel;
 import org.springframework.integration.channel.QueueChannel;
 import org.springframework.integration.codec.Codec;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.dsl.IntegrationFlow;
-import org.springframework.integration.dsl.MessageChannels;
 import org.springframework.integration.dsl.Transformers;
 import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
 import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
@@ -423,7 +423,7 @@ public IntegrationFlow replyProducingSubFlowEnricher() {
 
 		@Bean
 		public MessageChannel enricherReplyChannel() {
-			return MessageChannels.direct().get();
+			return new DirectChannel();
 		}
 
 		@Bean
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/json/ContentTypeConversionTests.java b/spring-integration-core/src/test/java/org/springframework/integration/json/ContentTypeConversionTests.java
index 56ed5d07ee3..9f9afd651d1 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/json/ContentTypeConversionTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/json/ContentTypeConversionTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017-2022 the original author or authors.
+ * Copyright 2017-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,8 +20,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
@@ -32,6 +31,7 @@
 import org.springframework.integration.annotation.MessagingGateway;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.dsl.DirectChannelSpec;
 import org.springframework.integration.dsl.IntegrationFlow;
 import org.springframework.integration.dsl.MessageChannels;
 import org.springframework.messaging.Message;
@@ -39,7 +39,7 @@
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.handler.annotation.Payload;
 import org.springframework.messaging.support.ChannelInterceptor;
-import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -48,7 +48,7 @@
  *
  * @since 5.0
  */
-@RunWith(SpringRunner.class)
+@SpringJUnitConfig
 public class ContentTypeConversionTests {
 
 	@Autowired
@@ -91,7 +91,7 @@ public AtomicReference sendData() {
 		}
 
 		@Bean
-		public MessageChannel serviceChannel(final AtomicReference sendData) {
+		public DirectChannelSpec serviceChannel(AtomicReference sendData) {
 			return MessageChannels.direct()
 					.interceptor(new ChannelInterceptor() {
 
@@ -101,8 +101,7 @@ public Message preSend(Message message, MessageChannel channel) {
 							return message;
 						}
 
-					})
-					.get();
+					});
 		}
 
 		@Bean
diff --git a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt
index 23bb5c5cb14..bf0f87f5898 100644
--- a/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt
+++ b/spring-integration-core/src/test/kotlin/org/springframework/integration/dsl/KotlinDslTests.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2020-2021 the original author or authors.
+ * Copyright 2020-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -231,7 +231,7 @@ class KotlinDslTests {
 
 		@Bean(PollerMetadata.DEFAULT_POLLER)
 		fun defaultPoller() =
-			Pollers.fixedDelay(100).maxMessagesPerPoll(1).get()
+			Pollers.fixedDelay(100).maxMessagesPerPoll(1)
 
 		@Bean
 		fun convertFlow() =
diff --git a/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy b/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy
index a7b38e396fd..27ed13d271b 100644
--- a/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy
+++ b/spring-integration-groovy/src/test/groovy/org/springframework/integration/groovy/dsl/test/GroovyDslTests.groovy
@@ -216,7 +216,7 @@ class GroovyDslTests {
 
 		@Bean(PollerMetadata.DEFAULT_POLLER)
 		poller() {
-			Pollers.fixedDelay(1000).get()
+			Pollers.fixedDelay(1000)
 		}
 
 
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java
index 4c3549a6394..3db20370d04 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundChannelAdapterSpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2020 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@ protected TcpInboundChannelAdapterSpec(AbstractConnectionFactory connectionFacto
 	 */
 	protected TcpInboundChannelAdapterSpec(AbstractConnectionFactorySpec connectionFactorySpec) {
 		super(new TcpReceivingChannelAdapter());
-		this.connectionFactory = connectionFactorySpec.get();
+		this.connectionFactory = connectionFactorySpec.getObject();
 		this.target.setConnectionFactory(this.connectionFactory);
 	}
 
@@ -94,7 +94,7 @@ public TcpInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) {
 	public Map getComponentsToRegister() {
 		return this.connectionFactory != null
 				? Collections.singletonMap(this.connectionFactory, this.connectionFactory.getComponentName())
-				: null;
+				: Collections.emptyMap();
 	}
 
 }
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java
index d24018b193e..0cd601dfc0c 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2020 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -55,7 +55,7 @@ protected TcpInboundGatewaySpec(AbstractConnectionFactory connectionFactoryBean)
 	 */
 	protected TcpInboundGatewaySpec(AbstractConnectionFactorySpec connectionFactorySpec) {
 		super(new TcpInboundGateway());
-		this.connectionFactory = connectionFactorySpec.get();
+		this.connectionFactory = connectionFactorySpec.getObject();
 		this.target.setConnectionFactory(this.connectionFactory);
 	}
 
@@ -93,7 +93,7 @@ public TcpInboundGatewaySpec taskScheduler(TaskScheduler taskScheduler) {
 	public Map getComponentsToRegister() {
 		return this.connectionFactory != null
 				? Collections.singletonMap(this.connectionFactory, this.connectionFactory.getComponentName())
-				: null;
+				: Collections.emptyMap();
 	}
 
 }
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java
index 211aa47ce02..e1d510e5768 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundChannelAdapterSpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2020 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@ protected TcpOutboundChannelAdapterSpec(AbstractConnectionFactory connectionFact
 	 */
 	protected TcpOutboundChannelAdapterSpec(AbstractConnectionFactorySpec connectionFactorySpec) {
 		this.target = new TcpSendingMessageHandler();
-		this.connectionFactory = connectionFactorySpec.get();
+		this.connectionFactory = connectionFactorySpec.getObject();
 		this.target.setConnectionFactory(this.connectionFactory);
 	}
 
@@ -94,7 +94,7 @@ public TcpOutboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler)
 	public Map getComponentsToRegister() {
 		return this.connectionFactory != null
 				? Collections.singletonMap(this.connectionFactory, this.connectionFactory.getComponentName())
-				: null;
+				: Collections.emptyMap();
 	}
 
 }
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java
index 9e2ba4acbd2..5737bacb503 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java
@@ -58,7 +58,7 @@ public TcpOutboundGatewaySpec(AbstractClientConnectionFactory connectionFactoryB
 	 */
 	public TcpOutboundGatewaySpec(TcpClientConnectionFactorySpec connectionFactorySpec) {
 		this.target = new TcpOutboundGateway();
-		this.connectionFactory = connectionFactorySpec.get();
+		this.connectionFactory = connectionFactorySpec.getObject();
 		this.target.setConnectionFactory(this.connectionFactory);
 	}
 
@@ -120,8 +120,21 @@ public TcpOutboundGatewaySpec async(boolean async) {
 	 * @param channelName the name.
 	 * @return the spec.
 	 * @since 5.4
+	 * @deprecated in favor of {@link #unsolicitedMessageChannelName(String)}
+	 * due to the typo in method name.
 	 */
+	@Deprecated(since = "6.1", forRemoval = true)
 	public TcpOutboundGatewaySpec unsolictedMessageChannelName(String channelName) {
+		return unsolicitedMessageChannelName(channelName);
+	}
+
+	/**
+	 * Set the unsolicited message channel name.
+	 * @param channelName the name.
+	 * @return the spec.
+	 * @since 6.1
+	 */
+	public TcpOutboundGatewaySpec unsolicitedMessageChannelName(String channelName) {
 		this.target.setUnsolicitedMessageChannelName(channelName);
 		return this;
 	}
@@ -131,8 +144,21 @@ public TcpOutboundGatewaySpec unsolictedMessageChannelName(String channelName) {
 	 * @param channel the channel.
 	 * @return the spec.
 	 * @since 5.4
+	 * @deprecated in favor of {@link #unsolicitedMessageChannel(MessageChannel)}
+	 * due to the typo in method name.
 	 */
+	@Deprecated(since = "6.1", forRemoval = true)
 	public TcpOutboundGatewaySpec unsolictedMessageChannelName(MessageChannel channel) {
+		return unsolicitedMessageChannel(channel);
+	}
+
+	/**
+	 * Set the unsolicited message channel.
+	 * @param channel the channel.
+	 * @return the spec.
+	 * @since 6.1
+	 */
+	public TcpOutboundGatewaySpec unsolicitedMessageChannel(MessageChannel channel) {
 		this.target.setUnsolicitedMessageChannel(channel);
 		return this;
 	}
@@ -141,7 +167,7 @@ public TcpOutboundGatewaySpec unsolictedMessageChannelName(MessageChannel channe
 	public Map getComponentsToRegister() {
 		return this.connectionFactory != null
 				? Collections.singletonMap(this.connectionFactory, this.connectionFactory.getComponentName())
-				: null;
+				: Collections.emptyMap();
 	}
 
 }
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java
similarity index 92%
rename from spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java
rename to spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java
index d30783520b5..bd03f34dd7f 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFacforyTests.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java
@@ -45,16 +45,18 @@
 /**
  * @author Gary Russell
  * @author Tim Ysewyn
+ * @author Artem Bilan
+ *
  * @since 5.0
  *
  */
-public class ConnectionFacforyTests {
+public class ConnectionFactoryTests {
 
 	@Test
 	public void test() throws Exception {
 		ApplicationEventPublisher publisher = e -> {
 		};
-		AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).get();
+		AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).getObject();
 		final AtomicReference> received = new AtomicReference<>();
 		final CountDownLatch latch = new CountDownLatch(1);
 		server.registerListener(m -> {
@@ -66,7 +68,7 @@ public void test() throws Exception {
 		server.afterPropertiesSet();
 		server.start();
 		TestingUtilities.waitListening(server, null);
-		AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).get();
+		AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).getObject();
 		client.setApplicationEventPublisher(publisher);
 		client.afterPropertiesSet();
 		client.start();
@@ -78,20 +80,20 @@ public void test() throws Exception {
 	}
 
 	@Test
-	public void shouldReturnNioFlavor() throws Exception {
-		AbstractServerConnectionFactory server = Tcp.nioServer(0).get();
+	public void shouldReturnNioFlavor() {
+		AbstractServerConnectionFactory server = Tcp.nioServer(0).getObject();
 		assertThat(server instanceof TcpNioServerConnectionFactory).isTrue();
 
-		AbstractClientConnectionFactory client = Tcp.nioClient("localhost", server.getPort()).get();
+		AbstractClientConnectionFactory client = Tcp.nioClient("localhost", server.getPort()).getObject();
 		assertThat(client instanceof TcpNioClientConnectionFactory).isTrue();
 	}
 
 	@Test
-	public void shouldReturnNetFlavor() throws Exception {
-		AbstractServerConnectionFactory server = Tcp.netServer(0).get();
+	public void shouldReturnNetFlavor() {
+		AbstractServerConnectionFactory server = Tcp.netServer(0).getObject();
 		assertThat(server instanceof TcpNetServerConnectionFactory).isTrue();
 
-		AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).get();
+		AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).getObject();
 		assertThat(client instanceof TcpNetClientConnectionFactory).isTrue();
 	}
 
@@ -104,7 +106,7 @@ void netCustomServer() {
 				.socketSupport(sockSupp)
 				.connectionSupport(conSupp)
 				.socketFactorySupport(factSupp)
-				.get();
+				.getObject();
 		assertThat(TestUtils.getPropertyValue(server, "tcpSocketSupport")).isSameAs(sockSupp);
 		assertThat(TestUtils.getPropertyValue(server, "tcpNetConnectionSupport")).isSameAs(conSupp);
 		assertThat(TestUtils.getPropertyValue(server, "tcpSocketFactorySupport")).isSameAs(factSupp);
@@ -118,7 +120,7 @@ void nioCustomServer() {
 				.socketSupport(sockSupp)
 				.directBuffers(true)
 				.connectionSupport(conSupp)
-				.get();
+				.getObject();
 		assertThat(TestUtils.getPropertyValue(server, "tcpSocketSupport")).isSameAs(sockSupp);
 		assertThat(TestUtils.getPropertyValue(server, "usingDirectBuffers", Boolean.class)).isTrue();
 		assertThat(TestUtils.getPropertyValue(server, "tcpNioConnectionSupport")).isSameAs(conSupp);
@@ -133,7 +135,7 @@ void netCustomClient() {
 				.socketSupport(sockSupp)
 				.connectionSupport(conSupp)
 				.socketFactorySupport(factSupp)
-				.get();
+				.getObject();
 		assertThat(TestUtils.getPropertyValue(client, "tcpSocketSupport")).isSameAs(sockSupp);
 		assertThat(TestUtils.getPropertyValue(client, "tcpNetConnectionSupport")).isSameAs(conSupp);
 		assertThat(TestUtils.getPropertyValue(client, "tcpSocketFactorySupport")).isSameAs(factSupp);
@@ -147,7 +149,7 @@ void nioCustomClient() {
 				.socketSupport(sockSupp)
 				.directBuffers(true)
 				.connectionSupport(conSupp)
-				.get();
+				.getObject();
 		assertThat(TestUtils.getPropertyValue(client, "tcpSocketSupport")).isSameAs(sockSupp);
 		assertThat(TestUtils.getPropertyValue(client, "usingDirectBuffers", Boolean.class)).isTrue();
 		assertThat(TestUtils.getPropertyValue(client, "tcpNioConnectionSupport")).isSameAs(conSupp);
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java
index 6622bfe0b58..9e5501d08ca 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -48,6 +48,8 @@
 import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
 import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
 import org.springframework.integration.ip.tcp.connection.TcpConnectionServerListeningEvent;
+import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
+import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
 import org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer;
 import org.springframework.integration.ip.tcp.serializer.TcpCodecs;
 import org.springframework.integration.ip.udp.MulticastSendingMessageHandler;
@@ -126,21 +128,21 @@ public class IpIntegrationTests {
 	void testTcpAdapters() {
 		ApplicationEventPublisher publisher = e -> {
 		};
-		AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).id("server").get();
+		AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).id("server").getObject();
 		assertThat(server.getComponentName()).isEqualTo("server");
 		server.setApplicationEventPublisher(publisher);
 		server.afterPropertiesSet();
-		TcpReceivingChannelAdapter inbound = Tcp.inboundAdapter(server).get();
+		TcpReceivingChannelAdapter inbound = Tcp.inboundAdapter(server).getObject();
 		QueueChannel received = new QueueChannel();
 		inbound.setOutputChannel(received);
 		inbound.afterPropertiesSet();
 		inbound.start();
 		TestingUtilities.waitListening(server, null);
-		AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).id("client").get();
+		AbstractClientConnectionFactory client = Tcp.netClient("localhost", server.getPort()).id("client").getObject();
 		assertThat(client.getComponentName()).isEqualTo("client");
 		client.setApplicationEventPublisher(publisher);
 		client.afterPropertiesSet();
-		TcpSendingMessageHandler handler = Tcp.outboundAdapter(client).get();
+		TcpSendingMessageHandler handler = Tcp.outboundAdapter(client).getObject();
 		handler.start();
 		handler.handleMessage(new GenericMessage<>("foo"));
 		Message receivedMessage = received.receive(10000);
@@ -193,7 +195,8 @@ void testUdpInheritance() {
 		UdpMulticastOutboundChannelAdapterSpec udpMulticastOutboundChannelAdapterSpec2 =
 				udpMulticastOutboundChannelAdapterSpec1.timeToLive(10);
 
-		assertThat(udpMulticastOutboundChannelAdapterSpec2.get()).isInstanceOf(MulticastSendingMessageHandler.class);
+		assertThat(udpMulticastOutboundChannelAdapterSpec2.getObject())
+				.isInstanceOf(MulticastSendingMessageHandler.class);
 	}
 
 	@Test
@@ -249,17 +252,16 @@ public static class Config {
 		private volatile String connectionId;
 
 		@Bean
-		public AbstractServerConnectionFactory server1() {
+		public TcpNetServerConnectionFactorySpec server1() {
 			return Tcp.netServer(0)
 					.serializer(TcpCodecs.lengthHeader1())
-					.deserializer(TcpCodecs.crlf())
-					.get();
+					.deserializer(TcpCodecs.crlf());
 		}
 
 		@Bean
-		public IntegrationFlow inTcpGateway() {
+		public IntegrationFlow inTcpGateway(TcpNetServerConnectionFactory server1) {
 			return IntegrationFlow.from(
-							Tcp.inboundGateway(server1())
+							Tcp.inboundGateway(server1)
 									.replyTimeout(1)
 									.errorOnTimeout(true)
 									.errorChannel("inTcpGatewayErrorFlow.input"))
@@ -276,8 +278,8 @@ public Message captureId(Message msg) {
 		}
 
 		@Bean
-		public IntegrationFlow unsolicitedServerSide() {
-			return f -> f.handle(Tcp.outboundAdapter(server1()));
+		public IntegrationFlow unsolicitedServerSide(TcpNetServerConnectionFactory server1) {
+			return f -> f.handle(Tcp.outboundAdapter(server1));
 		}
 
 		@Bean
@@ -321,19 +323,17 @@ public ApplicationListener events() {
 		}
 
 		@Bean
-		public AbstractClientConnectionFactory client1() {
-			return Tcp.netClient("localhost", server1().getPort())
+		public TcpNetClientConnectionFactorySpec client1(TcpNetServerConnectionFactory server1) {
+			return Tcp.netClient("localhost", server1.getPort())
 					.serializer(TcpCodecs.crlf())
-					.deserializer(TcpCodecs.lengthHeader1())
-					.get();
+					.deserializer(TcpCodecs.lengthHeader1());
 		}
 
 		@Bean
-		public TcpOutboundGateway tcpOut() {
-			return Tcp.outboundGateway(client1())
+		public TcpOutboundGatewaySpec tcpOut(TcpNetClientConnectionFactory client1) {
+			return Tcp.outboundGateway(client1)
 					.remoteTimeout(m -> 5000)
-					.unsolictedMessageChannelName("unsolicited")
-					.get();
+					.unsolicitedMessageChannelName("unsolicited");
 		}
 
 		@Bean
@@ -342,19 +342,17 @@ public QueueChannel unsolicited() {
 		}
 
 		@Bean
-		public AbstractClientConnectionFactory client2() {
-			return Tcp.netClient("localhost", server1().getPort())
+		public TcpNetClientConnectionFactorySpec client2(TcpNetServerConnectionFactory server1) {
+			return Tcp.netClient("localhost", server1.getPort())
 					.serializer(TcpCodecs.crlf())
-					.deserializer(TcpCodecs.lengthHeader1())
-					.get();
+					.deserializer(TcpCodecs.lengthHeader1());
 		}
 
 		@Bean
-		public TcpOutboundGateway tcpOutAsync() {
-			return Tcp.outboundGateway(client2())
+		public TcpOutboundGatewaySpec tcpOutAsync(TcpNetClientConnectionFactory client2) {
+			return Tcp.outboundGateway(client2)
 					.async(true)
-					.remoteTimeout(m -> 5000)
-					.get();
+					.remoteTimeout(m -> 5000);
 		}
 
 		@Bean
@@ -371,9 +369,9 @@ public MethodInterceptor testAdvice() {
 		}
 
 		@Bean
-		public IntegrationFlow clientTcpFlow() {
+		public IntegrationFlow clientTcpFlow(TcpOutboundGateway tcpOut) {
 			return f -> f
-					.handle(tcpOut(), e -> e.advice(testAdvice()))
+					.handle(tcpOut, e -> e.advice(testAdvice()))
 					.transform(Transformers.objectToString());
 		}
 
diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/Jms.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/Jms.java
index 4f6b330ab71..e3cba216ff7 100644
--- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/Jms.java
+++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/Jms.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014-2021 the original author or authors.
+ * Copyright 2014-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -208,7 +208,7 @@ JmsInboundGatewaySpec.JmsInboundGatewayListenerContainerSpec inboundGatewa
 	public static JmsMessageDrivenChannelAdapterSpec messageDrivenChannelAdapter(
 			JmsListenerContainerSpec jmsListenerContainerSpec) {
 
-		return new JmsMessageDrivenChannelAdapterSpec<>(jmsListenerContainerSpec.get());
+		return new JmsMessageDrivenChannelAdapterSpec<>(jmsListenerContainerSpec.getObject());
 	}
 
 	/**
@@ -216,7 +216,9 @@ public static JmsMessageDrivenChannelAdapterSpec messageDrivenChannelAdapter(
 	 * @param listenerContainer the {@link AbstractMessageListenerContainer} to build on
 	 * @return the {@link JmsMessageDrivenChannelAdapterSpec} instance
 	 */
-	public static JmsMessageDrivenChannelAdapterSpec messageDrivenChannelAdapter(AbstractMessageListenerContainer listenerContainer) {
+	public static JmsMessageDrivenChannelAdapterSpec messageDrivenChannelAdapter(
+			AbstractMessageListenerContainer listenerContainer) {
+
 		return new JmsMessageDrivenChannelAdapterSpec<>(listenerContainer);
 	}
 
@@ -227,14 +229,10 @@ public static JmsMessageDrivenChannelAdapterSpec messageDrivenChannelAdapter(
 	 */
 	public static JmsMessageDrivenChannelAdapterSpec.JmsMessageDrivenChannelAdapterListenerContainerSpec
 	messageDrivenChannelAdapter(ConnectionFactory connectionFactory) {
-		try {
-			return new JmsMessageDrivenChannelAdapterSpec.JmsMessageDrivenChannelAdapterListenerContainerSpec<>(
-					new JmsDefaultListenerContainerSpec()
-							.connectionFactory(connectionFactory));
-		}
-		catch (Exception e) {
-			throw new IllegalStateException(e);
-		}
+
+		return new JmsMessageDrivenChannelAdapterSpec.JmsMessageDrivenChannelAdapterListenerContainerSpec<>(
+				new JmsDefaultListenerContainerSpec()
+						.connectionFactory(connectionFactory));
 	}
 
 	/**
@@ -249,15 +247,11 @@ public static JmsMessageDrivenChannelAdapterSpec messageDrivenChannelAdapter(
 	public static 
 	JmsMessageDrivenChannelAdapterSpec.JmsMessageDrivenChannelAdapterListenerContainerSpec
 	messageDrivenChannelAdapter(ConnectionFactory connectionFactory, Class containerClass) {
-		try {
-			JmsListenerContainerSpec spec =
-					new JmsListenerContainerSpec<>(containerClass)
-							.connectionFactory(connectionFactory);
-			return new JmsMessageDrivenChannelAdapterSpec.JmsMessageDrivenChannelAdapterListenerContainerSpec(spec);
-		}
-		catch (Exception e) {
-			throw new IllegalStateException(e);
-		}
+
+		JmsListenerContainerSpec spec =
+				new JmsListenerContainerSpec<>(containerClass)
+						.connectionFactory(connectionFactory);
+		return new JmsMessageDrivenChannelAdapterSpec.JmsMessageDrivenChannelAdapterListenerContainerSpec(spec);
 	}
 
 	/**
@@ -268,14 +262,10 @@ public static JmsMessageDrivenChannelAdapterSpec messageDrivenChannelAdapter(
 	 */
 	public static JmsDefaultListenerContainerSpec container(ConnectionFactory connectionFactory,
 			Destination destination) {
-		try {
-			return new JmsDefaultListenerContainerSpec()
-					.connectionFactory(connectionFactory)
-					.destination(destination);
-		}
-		catch (Exception e) {
-			throw new IllegalStateException(e);
-		}
+
+		return new JmsDefaultListenerContainerSpec()
+				.connectionFactory(connectionFactory)
+				.destination(destination);
 	}
 
 	/**
@@ -286,14 +276,10 @@ public static JmsDefaultListenerContainerSpec container(ConnectionFactory connec
 	 */
 	public static JmsDefaultListenerContainerSpec container(ConnectionFactory connectionFactory,
 			String destinationName) {
-		try {
-			return new JmsDefaultListenerContainerSpec()
-					.connectionFactory(connectionFactory)
-					.destination(destinationName);
-		}
-		catch (Exception e) {
-			throw new IllegalStateException(e);
-		}
+
+		return new JmsDefaultListenerContainerSpec()
+				.connectionFactory(connectionFactory)
+				.destination(destinationName);
 	}
 
 	private Jms() {
diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundChannelAdapterSpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundChannelAdapterSpec.java
index 156375bf073..7f21ed36025 100644
--- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundChannelAdapterSpec.java
+++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundChannelAdapterSpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2021 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -49,7 +49,8 @@ protected JmsInboundChannelAdapterSpec(JmsTemplate jmsTemplate) {
 	}
 
 	private JmsInboundChannelAdapterSpec(ConnectionFactory connectionFactory) {
-		this.target = new JmsDestinationPollingSource(this.jmsTemplateSpec.connectionFactory(connectionFactory).get());
+		this.target =
+				new JmsDestinationPollingSource(this.jmsTemplateSpec.connectionFactory(connectionFactory).getObject());
 	}
 
 	/**
@@ -118,7 +119,7 @@ public JmsInboundChannelSpecTemplateAware configureJmsTemplate(Consumer getComponentsToRegister() {
-			return Collections.singletonMap(this.jmsTemplateSpec.get(), this.jmsTemplateSpec.getId());
+			return Collections.singletonMap(this.jmsTemplateSpec.getObject(), this.jmsTemplateSpec.getId());
 		}
 
 	}
diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java
index f9539f4bbc1..d84d45a7033 100644
--- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java
+++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java
@@ -246,9 +246,9 @@ public static class JmsInboundGatewayListenerContainerSpec destination(Str
 		 */
 		public JmsMessageDrivenChannelAdapterListenerContainerSpec configureListenerContainer(
 				Consumer configurer) {
+
 			Assert.notNull(configurer, "'configurer' must not be null");
 			configurer.accept(this.spec);
 			return _this();
@@ -148,7 +149,7 @@ public JmsMessageDrivenChannelAdapterListenerContainerSpec configureListen
 
 		@Override
 		public Map getComponentsToRegister() {
-			return Collections.singletonMap(this.spec.get(), this.spec.getId());
+			return Collections.singletonMap(this.spec.getObject(), this.spec.getId());
 		}
 
 	}
diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundChannelAdapterSpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundChannelAdapterSpec.java
index 02cf087bfef..4dade6b2f6c 100644
--- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundChannelAdapterSpec.java
+++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundChannelAdapterSpec.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2021 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -51,7 +51,8 @@ protected JmsOutboundChannelAdapterSpec(JmsTemplate jmsTemplate) {
 	}
 
 	private JmsOutboundChannelAdapterSpec(ConnectionFactory connectionFactory) {
-		this.target = new JmsSendingMessageHandler(this.jmsTemplateSpec.connectionFactory(connectionFactory).get());
+		this.target =
+				new JmsSendingMessageHandler(this.jmsTemplateSpec.connectionFactory(connectionFactory).getObject());
 	}
 
 	/**
@@ -101,7 +102,7 @@ public S destination(String destination) {
 	 * which a message will be sent.
 	 * @param destination the destination name.
 	 * @return the current {@link JmsOutboundChannelAdapterSpec}.
-	 * @see JmsSendingMessageHandler#setDestinationExpression(Expression)
+	 * @see JmsSendingMessageHandler#setDestinationExpression
 	 */
 	public S destinationExpression(String destination) {
 		this.target.setDestinationExpression(PARSER.parseExpression(destination));
@@ -119,7 +120,7 @@ public S destinationExpression(String destination) {
 	 * @param destinationFunction the destination function.
 	 * @param 

the expected payload type. * @return the current {@link JmsOutboundChannelAdapterSpec}. - * @see JmsSendingMessageHandler#setDestinationExpression(Expression) + * @see JmsSendingMessageHandler#setDestinationExpression * @see FunctionExpression */ public

S destination(Function, ?> destinationFunction) { @@ -194,7 +195,7 @@ public JmsOutboundChannelSpecTemplateAware configureJmsTemplate(Consumer getComponentsToRegister() { - return Collections.singletonMap(this.jmsTemplateSpec.get(), this.jmsTemplateSpec.getId()); + return Collections.singletonMap(this.jmsTemplateSpec.getObject(), this.jmsTemplateSpec.getId()); } } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java index 5b3cb674c59..90172547e22 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsOutboundGatewaySpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -296,7 +296,7 @@ public JmsOutboundGatewaySpec replyContainer(Consumer config Assert.notNull(configurer, "'configurer' must not be null"); ReplyContainerSpec spec = new ReplyContainerSpec(); configurer.accept(spec); - this.target.setReplyContainerProperties(spec.get()); + this.target.setReplyContainerProperties(spec.getObject()); return _this(); } diff --git a/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java b/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java index e50d4c44506..fc4c87035fe 100644 --- a/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java +++ b/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java @@ -36,7 +36,6 @@ import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.Poller; -import org.springframework.integration.channel.BroadcastCapableChannel; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.FixedSubscriberChannel; import org.springframework.integration.channel.QueueChannel; @@ -46,11 +45,14 @@ import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlowDefinition; import org.springframework.integration.dsl.MessageChannels; +import org.springframework.integration.dsl.PollerSpec; import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.dsl.QueueChannelSpec; import org.springframework.integration.endpoint.MethodInvokingMessageSource; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.jms.ActiveMQMultiContextTests; import org.springframework.integration.jms.JmsDestinationPollingSource; +import org.springframework.integration.jms.SubscribableJmsChannel; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; @@ -302,8 +304,8 @@ public JmsTemplate jmsTemplate() { } @Bean(name = PollerMetadata.DEFAULT_POLLER) - public PollerMetadata poller() { - return Pollers.fixedDelay(1000).get(); + public PollerSpec poller() { + return Pollers.fixedDelay(1000); } @Bean @@ -338,30 +340,29 @@ public IntegrationFlow jmsOutboundFlow() { } @Bean - public MessageChannel jmsOutboundInboundReplyChannel() { - return MessageChannels.queue().get(); + public QueueChannelSpec jmsOutboundInboundReplyChannel() { + return MessageChannels.queue(); } @Bean - public IntegrationFlow jmsInboundFlow() { + public IntegrationFlow jmsInboundFlow(QueueChannel jmsOutboundInboundReplyChannel) { return IntegrationFlow .from(Jms.inboundAdapter(amqFactory).destination("jmsInbound")) .transform(String::toUpperCase) - .channel(this.jmsOutboundInboundReplyChannel()) + .channel(jmsOutboundInboundReplyChannel) .get(); } @Bean - public BroadcastCapableChannel jmsPublishSubscribeChannel() { + public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() { return Jms.publishSubscribeChannel(amqFactory) - .destination("pubsub") - .get(); + .destination("pubsub"); } @Bean - public IntegrationFlow pubSubFlow() { + public IntegrationFlow pubSubFlow(SubscribableJmsChannel jmsPublishSubscribeChannel) { return f -> f - .publishSubscribeChannel(jmsPublishSubscribeChannel(), + .publishSubscribeChannel(jmsPublishSubscribeChannel, pubsub -> pubsub .subscribe(subFlow -> subFlow .channel(c -> c.queue("jmsPubSubBridgeChannel"))) @@ -408,8 +409,7 @@ public IntegrationFlow jmsMessageDrivenFlowWithContainer() { .from(Jms.messageDrivenChannelAdapter( Jms.container(amqFactory, "containerSpecDestination") .pubSubDomain(false) - .taskExecutor(Executors.newCachedThreadPool()) - .get())) + .taskExecutor(Executors.newCachedThreadPool()))) .transform(String::trim) .channel(jmsOutboundInboundReplyChannel()) .get(); diff --git a/spring-integration-jms/src/test/kotlin/org/springframework/integration/jms/dsl/JmsDslKotlinTests.kt b/spring-integration-jms/src/test/kotlin/org/springframework/integration/jms/dsl/JmsDslKotlinTests.kt index e1d38f65582..af723f62dd1 100644 --- a/spring-integration-jms/src/test/kotlin/org/springframework/integration/jms/dsl/JmsDslKotlinTests.kt +++ b/spring-integration-jms/src/test/kotlin/org/springframework/integration/jms/dsl/JmsDslKotlinTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.integration.IntegrationMessageHeaderAccessor +import org.springframework.integration.channel.QueueChannel import org.springframework.integration.config.EnableIntegration import org.springframework.integration.dsl.MessageChannels import org.springframework.integration.dsl.integrationFlow @@ -112,10 +113,10 @@ class JmsDslKotlinTests : ActiveMQMultiContextTests() { } @Bean - fun jmsOutboundInboundReplyChannel() = MessageChannels.queue().get() + fun jmsOutboundInboundReplyChannel() = MessageChannels.queue() @Bean - fun jmsMessageDrivenFlowWithContainer() = + fun jmsMessageDrivenFlowWithContainer(jmsOutboundInboundReplyChannel: QueueChannel) = integrationFlow( Jms.messageDrivenChannelAdapter( Jms.container(amqFactory, "containerSpecDestination") @@ -125,7 +126,7 @@ class JmsDslKotlinTests : ActiveMQMultiContextTests() { .headerMapper(jmsHeaderMapper()) ) { transform { it: String -> it.trim { it <= ' ' } } - channel(jmsOutboundInboundReplyChannel()) + channel(jmsOutboundInboundReplyChannel) } } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java index 96407478f82..f06a9d2ec43 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundGatewaySpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -102,6 +102,7 @@ public S recoveryCallback(RecoveryCallback recoveryCallback) { */ public S onPartitionsAssignedSeekCallback( BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) { + this.target.setOnPartitionsAssignedSeekCallback(onPartitionsAssignedCallback); return _this(); } @@ -128,7 +129,7 @@ public static class KafkaInboundGatewayListenerContainerSpec extends KafkaInboundGatewayListenerContainerSpec(KafkaMessageListenerContainerSpec containerSpec, KafkaTemplateSpec templateSpec) { - super(containerSpec.get(), templateSpec.getTemplate()); + super(containerSpec.getObject(), templateSpec.getTemplate()); this.containerSpec = containerSpec; this.templateSpec = templateSpec; } @@ -164,8 +165,8 @@ public KafkaInboundGatewayListenerContainerSpec configureTemplate( @Override public Map getComponentsToRegister() { return new ObjectStringMapBuilder() - .put(this.containerSpec.get(), this.containerSpec.getId()) - .put(this.templateSpec.get(), this.templateSpec.getId()) + .put(this.containerSpec.getObject(), this.containerSpec.getId()) + .put(this.templateSpec.getObject(), this.templateSpec.getId()) .get(); } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java index cb1c7e066a6..8d3062966df 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageDrivenChannelAdapterSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,6 +57,7 @@ public class KafkaMessageDrivenChannelAdapterSpec messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) { + super(new KafkaMessageDrivenChannelAdapter<>(messageListenerContainer, listenerMode)); this.container = messageListenerContainer; } @@ -174,6 +175,7 @@ public S filterInRetry(boolean filterInRetry) { */ public S onPartitionsAssignedSeekCallback( BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) { + this.target.setOnPartitionsAssignedSeekCallback(onPartitionsAssignedCallback); return _this(); } @@ -196,7 +198,7 @@ public static class KafkaMessageDrivenChannelAdapterListenerContainerSpec KafkaMessageDrivenChannelAdapterListenerContainerSpec(KafkaMessageListenerContainerSpec spec, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode) { - super(spec.get(), listenerMode); + super(spec.getObject(), listenerMode); this.spec = spec; } @@ -208,6 +210,7 @@ public static class KafkaMessageDrivenChannelAdapterListenerContainerSpec */ public KafkaMessageDrivenChannelAdapterListenerContainerSpec configureListenerContainer( Consumer> configurer) { + Assert.notNull(configurer, "The 'configurer' cannot be null"); configurer.accept(this.spec); return _this(); @@ -215,7 +218,7 @@ public KafkaMessageDrivenChannelAdapterListenerContainerSpec configureList @Override public Map getComponentsToRegister() { - return Collections.singletonMap(this.spec.get(), this.spec.getId()); + return Collections.singletonMap(this.spec.getObject(), this.spec.getId()); } } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java index 91dad5cf771..99beb2326f8 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -110,7 +110,7 @@ public KafkaGatewayMessageHandlerTemplateSpec configureKafkaTemplate( @Override public Map getComponentsToRegister() { - return Collections.singletonMap(this.kafkaTemplateSpec.get(), this.kafkaTemplateSpec.getId()); + return Collections.singletonMap(this.kafkaTemplateSpec.getTemplate(), this.kafkaTemplateSpec.getId()); } } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java index 9127d518955..68c0c5040e4 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaProducerMessageHandlerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -429,10 +429,9 @@ public KafkaProducerMessageHandlerTemplateSpec configureKafkaTemplate( @Override public Map getComponentsToRegister() { - return Collections.singletonMap(this.kafkaTemplateSpec.get(), this.kafkaTemplateSpec.getId()); + return Collections.singletonMap(this.kafkaTemplateSpec.getTemplate(), this.kafkaTemplateSpec.getId()); } } } - diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java index 13eac986e83..bf2037ee1de 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,12 +38,12 @@ import org.springframework.context.annotation.Configuration; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.MessageRejectedException; -import org.springframework.integration.channel.BroadcastCapableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.kafka.channel.PollableKafkaChannel; +import org.springframework.integration.kafka.channel.PublishSubscribeKafkaChannel; import org.springframework.integration.kafka.inbound.KafkaErrorSendingMessageRecoverer; import org.springframework.integration.kafka.inbound.KafkaInboundGateway; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; @@ -427,10 +427,11 @@ public KafkaMessageSource channelSource(ConsumerFactory template, ConcurrentKafkaListenerContainerFactory containerFactory, - KafkaMessageSource channelSource) { + KafkaMessageSource channelSource, + PublishSubscribeKafkaChannel publishSubscribeKafkaChannel) { return IntegrationFlow.from(topic6Channel(template, containerFactory)) - .publishSubscribeChannel(pubSub(template, containerFactory), channel -> channel + .publishSubscribeChannel(publishSubscribeKafkaChannel, channel -> channel .subscribe(f -> f.channel( Kafka.pollableChannel(template, channelSource).id("topic8Channel"))) .subscribe(f -> f.channel( @@ -439,11 +440,10 @@ public IntegrationFlow channels(KafkaTemplate template, } @Bean - public BroadcastCapableChannel pubSub(KafkaTemplate template, + public KafkaPublishSubscribeChannelSpec pubSub(KafkaTemplate template, ConcurrentKafkaListenerContainerFactory containerFactory) { - return Kafka.publishSubscribeChannel(template, containerFactory, TEST_TOPIC7) - .get(); + return Kafka.publishSubscribeChannel(template, containerFactory, TEST_TOPIC7); } @Bean diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java index 66b1de007f4..8bf0b64966d 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java @@ -539,7 +539,7 @@ void testInboundJsonWithPayload() { .messageDrivenChannelAdapter(container, ListenerMode.record) .recordMessageConverter(new StringJsonMessageConverter()) .payloadType(Foo.class) - .get(); + .getObject(); QueueChannel out = new QueueChannel(); adapter.setOutputChannel(out); adapter.afterPropertiesSet(); diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java index 4b419fdd42c..d4237c39999 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/dsl/MongoDbTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,9 +40,11 @@ import org.springframework.data.mongodb.core.query.BasicQuery; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.MessageChannels; +import org.springframework.integration.dsl.QueueChannelSpec; import org.springframework.integration.handler.ReplyRequiredException; import org.springframework.integration.mongodb.MongoDbContainerTest; import org.springframework.integration.mongodb.outbound.MessageCollectionCallback; @@ -333,16 +335,16 @@ public IntegrationFlow gatewayCollectionNameFunctionFlow() { } @Bean - public IntegrationFlow gatewayCollectionCallbackFlow() { + public IntegrationFlow gatewayCollectionCallbackFlow(QueueChannel getResultChannel) { return f -> f .handle(collectionCallbackOutboundGateway( (collection, requestMessage) -> collection.countDocuments())) - .channel(getResultChannel()); + .channel(getResultChannel); } @Bean - public MessageChannel getResultChannel() { - return MessageChannels.queue().get(); + public QueueChannelSpec getResultChannel() { + return MessageChannels.queue(); } @Bean diff --git a/spring-integration-scripting/src/main/java/org/springframework/integration/scripting/dsl/ScriptMessageSourceSpec.java b/spring-integration-scripting/src/main/java/org/springframework/integration/scripting/dsl/ScriptMessageSourceSpec.java index dae6a1faafa..2d9fada5d5f 100644 --- a/spring-integration-scripting/src/main/java/org/springframework/integration/scripting/dsl/ScriptMessageSourceSpec.java +++ b/spring-integration-scripting/src/main/java/org/springframework/integration/scripting/dsl/ScriptMessageSourceSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,7 @@ public ScriptMessageSourceSpec lang(String lang) { * The {@link ScriptVariableGenerator} to use. * @param variableGenerator the {@link ScriptVariableGenerator} * @return the current spec - * @see ScriptSpec#variableGenerator + * @see ScriptSpec#variableGenerator(ScriptVariableGenerator) */ public ScriptMessageSourceSpec variableGenerator(ScriptVariableGenerator variableGenerator) { this.delegate.variableGenerator(variableGenerator); @@ -121,12 +121,12 @@ public ScriptMessageSourceSpec refreshCheckDelay(long refreshCheckDelay) { @Override protected MessageSource doGet() { - return new MessageProcessorMessageSource(this.delegate.get()); + return new MessageProcessorMessageSource(this.delegate.getObject()); } @Override public Map getComponentsToRegister() { - return Collections.singletonMap(this.delegate.get(), this.delegate.getId()); + return Collections.singletonMap(this.delegate.getObject(), this.delegate.getId()); } } diff --git a/spring-integration-ws/src/test/java/org/springframework/integration/ws/dsl/WsDslTests.java b/spring-integration-ws/src/test/java/org/springframework/integration/ws/dsl/WsDslTests.java index 6e2994d77fc..9c2c8ce2462 100644 --- a/spring-integration-ws/src/test/java/org/springframework/integration/ws/dsl/WsDslTests.java +++ b/spring-integration-ws/src/test/java/org/springframework/integration/ws/dsl/WsDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,12 +59,12 @@ void marshallingInbound() { Unmarshaller unmarshaller = mock(Unmarshaller.class); MarshallingWebServiceInboundGateway gateway = Ws.marshallingInboundGateway(marshaller) .unmarshaller(unmarshaller) - .get(); + .getObject(); assertThat(TestUtils.getPropertyValue(gateway, "marshaller")).isSameAs(marshaller); assertThat(TestUtils.getPropertyValue(gateway, "unmarshaller")).isSameAs(unmarshaller); marshaller = mock(Both.class); - gateway = Ws.marshallingInboundGateway(marshaller).get(); + gateway = Ws.marshallingInboundGateway(marshaller).getObject(); assertThat(TestUtils.getPropertyValue(gateway, "marshaller")).isSameAs(marshaller); assertThat(TestUtils.getPropertyValue(gateway, "unmarshaller")).isSameAs(marshaller); } @@ -73,7 +73,7 @@ void marshallingInbound() { void simpleInbound() { SimpleWebServiceInboundGateway gateway = Ws.simpleInboundGateway() .extractPayload(false) - .get(); + .getObject(); assertThat(TestUtils.getPropertyValue(gateway, "extractPayload", Boolean.class)).isFalse(); } @@ -104,7 +104,7 @@ void marshallingOutbound() { .messageSenders(messageSender) .requestCallback(requestCallback) .uriVariableExpressions(uriVariableExpressions) - .get(); + .getObject(); assertThat(TestUtils.getPropertyValue(gateway, "webServiceTemplate.marshaller")).isSameAs(marshaller); assertThat(TestUtils.getPropertyValue(gateway, "webServiceTemplate.unmarshaller")).isSameAs(unmarshaller); assertThat(TestUtils.getPropertyValue(gateway, "webServiceTemplate.messageFactory")).isSameAs(messageFactory); @@ -147,7 +147,7 @@ void simpleOutbound() { .requestCallback(requestCallback) .uriVariableExpressions(uriVariableExpressions) .extractPayload(false) - .get(); + .getObject(); assertThat(TestUtils.getPropertyValue(gateway, "webServiceTemplate.messageFactory")).isSameAs(messageFactory); assertThat(TestUtils.getPropertyValue(gateway, "webServiceTemplate.faultMessageResolver")) .isSameAs(faultMessageResolver); @@ -178,7 +178,7 @@ void marshallingOutboundTemplate() { .ignoreEmptyResponses(true) .requestCallback(requestCallback) .uriVariableExpressions(uriVariableExpressions) - .get(); + .getObject(); assertThat(TestUtils.getPropertyValue(gateway, "uri")).isSameAs(uri); assertThat(TestUtils.getPropertyValue(gateway, "headerMapper")).isSameAs(headerMapper); assertThat(TestUtils.getPropertyValue(gateway, "requestCallback")).isSameAs(requestCallback); @@ -209,7 +209,7 @@ void simpleOutboundTemplate() { .requestCallback(requestCallback) .uriVariableExpressions(uriVariableExpressions) .extractPayload(false) - .get(); + .getObject(); assertThat(TestUtils.getPropertyValue(gateway, "headerMapper")).isSameAs(headerMapper); assertThat(TestUtils.getPropertyValue(gateway, "requestCallback")).isSameAs(requestCallback); assertThat(TestUtils.getPropertyValue(gateway, "uriVariableExpressions")).isEqualTo(uriVariableExpressions); @@ -225,4 +225,3 @@ interface Both extends Marshaller, Unmarshaller { } } - diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index 9b18c0a54e0..89c65f0f70c 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -39,8 +39,8 @@ public class MyConfiguration { } @Bean - public IntegrationFlow myFlow() { - return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement, + public IntegrationFlow myFlow(AtomicInteger integerSource) { + return IntegrationFlow.fromSupplier(integerSource::getAndIncrement, c -> c.poller(Pollers.fixedRate(100))) .channel("inputChannel") .filter((Integer p) -> p > 0) @@ -63,6 +63,10 @@ You need not replace all of your existing XML configuration to use Java configur The `org.springframework.integration.dsl` package contains the `IntegrationFlowBuilder` API mentioned earlier and a number of `IntegrationComponentSpec` implementations, which are also builders and provide the fluent API to configure concrete endpoints. The `IntegrationFlowBuilder` infrastructure provides common https://www.enterpriseintegrationpatterns.com/[enterprise integration patterns] (EIP) for message-based applications, such as channels, endpoints, pollers, and channel interceptors. +IMPORTANT:: The `IntegrationComponentSpec` is a `FactoryBean` implementation, therefore its `getObject()` method must not be called from bean definitions. +The `IntegrationComponentSpec` implementation must be left as is for bean definitions and the framework will manage its lifecycle. +Bean method parameter injection for the target `IntegrationComponentSpec` type (a `FactoryBean` value) must be used for `IntegrationFlow` bean definitions instead of bean method references. + Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint: @@ -163,10 +167,9 @@ The following example shows how to use it: [source,java] ---- @Bean -public MessageChannel priorityChannel() { +public PriorityChannelSpec priorityChannel() { return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup") - .interceptor(wireTap()) - .get(); + .interceptor(wireTap()); } ---- ==== @@ -181,13 +184,13 @@ The following example shows the possible ways to use the `channel()` EIP method: [source,java] ---- @Bean -public MessageChannel queueChannel() { - return MessageChannels.queue().get(); +public QueueChannelSpec queueChannel() { + return MessageChannels.queue(); } @Bean -public MessageChannel publishSubscribe() { - return MessageChannels.publishSubscribe().get(); +public PublishSubscribeChannelSpec publishSubscribe() { + return MessageChannels.publishSubscribe(); } @Bean @@ -261,7 +264,7 @@ public PollerSpec poller() { See https://docs.spring.io/spring-integration/api/org/springframework/integration/dsl/Pollers.html[`Pollers`] and https://docs.spring.io/spring-integration/api/org/springframework/integration/dsl/PollerSpec.html[`PollerSpec`] in the Javadoc for more information. -IMPORTANT: If you use the DSL to construct a `PollerSpec` as a `@Bean`, do not call the `get()` method in the bean definition. +IMPORTANT: If you use the DSL to construct a `PollerSpec` as a `@Bean`, do not call the `getObject()` method in the bean definition. The `PollerSpec` is a `FactoryBean` that generates the `PollerMetadata` object from the specification and initializes all of its properties. [[java-dsl-reactive]] @@ -833,30 +836,21 @@ For example, we now can configure several subscribers as sub-flows on the `Jms.p [source,java] ---- @Bean -public BroadcastCapableChannel jmsPublishSubscribeChannel() { +public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() { return Jms.publishSubscribeChannel(jmsConnectionFactory()) - .destination("pubsub") - .get(); + .destination("pubsub"); } @Bean -public IntegrationFlow pubSubFlow() { +public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) { return f -> f - .publishSubscribeChannel(jmsPublishSubscribeChannel(), + .publishSubscribeChannel(jmsPublishSubscribeChannel, pubsub -> pubsub .subscribe(subFlow -> subFlow .channel(c -> c.queue("jmsPubSubBridgeChannel1"))) .subscribe(subFlow -> subFlow .channel(c -> c.queue("jmsPubSubBridgeChannel2")))); } - -@Bean -public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) { - return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory) - .destination("pubsub") - .get(); -} - ---- ==== diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 9df89de4eca..3ead59b3e4d 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -41,6 +41,11 @@ See <<./filter.adoc#filter, Filter>> for more information. - The default timeout for send and receive operations in gateways and replying channel adapters has been changed from infinity to `30` seconds. Only one left as a `1` second is a `receiveTimeout` for `PollingConsumer` to not block a scheduler thread too long and let other queued tasks to be performed with the `TaskScheduler`. + - The `IntegrationComponentSpec.get()` method has been deprecated with removal planned for the next version. +Since `IntegrationComponentSpec` is a `FactoryBean`, its bean definition must stay as is without any target object resolutions. +The Java DSL and the framework by itself will manage the `IntegrationComponentSpec` lifecycle. +See <<./dsl.adoc#java-dsl, Java DSL>> for more information. + [[x6.1-web-sockets]] === Web Sockets Changes