Skip to content

Commit 24348a1

Browse files
committed
Rely on the RSocket FrameType header
* After introduction an `rsocketFrameType` header in the message, we don't need a custom `IntegrationRSocket` any more. Fully rely on the `MessagingRSocket` from the Spring Messaging, only handle properly a `RSocketFrameTypeMessageCondition.CONNECT_CONDITION` for connections from clients to emit an `RSocketConnectedEvent` * Copy a `ChannelSendOperator` from the Spring Messaging for the proper reply handling, when we are interested in the source `Publisher` subscription first. See its JavaDocs for more info * Reusing already `public PayloadUtils` instead of its method copies
1 parent 53d4faa commit 24348a1

File tree

6 files changed

+527
-388
lines changed

6 files changed

+527
-388
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class AbstractRSocketConnector
5353

5454
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
5555

56-
private MimeType metadataMimeType = IntegrationRSocket.COMPOSITE_METADATA;
56+
private MimeType metadataMimeType = new MimeType("message", "x.rsocket.composite-metadata.v0");
5757

5858
private RSocketStrategies rsocketStrategies =
5959
RSocketStrategies.builder()

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

Lines changed: 0 additions & 238 deletions
This file was deleted.

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java

Lines changed: 2 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,18 @@
1919
import java.lang.reflect.Method;
2020
import java.util.Collections;
2121
import java.util.List;
22-
import java.util.function.BiFunction;
2322

2423
import org.springframework.context.ApplicationContext;
2524
import org.springframework.core.MethodParameter;
26-
import org.springframework.lang.Nullable;
2725
import org.springframework.messaging.Message;
2826
import org.springframework.messaging.ReactiveMessageHandler;
2927
import org.springframework.messaging.handler.CompositeMessageCondition;
3028
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
3129
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
3230
import org.springframework.messaging.handler.invocation.reactive.SyncHandlerMethodArgumentResolver;
33-
import org.springframework.messaging.rsocket.RSocketRequester;
34-
import org.springframework.messaging.rsocket.RSocketStrategies;
35-
import org.springframework.messaging.rsocket.annotation.support.DefaultMetadataExtractor;
36-
import org.springframework.messaging.rsocket.annotation.support.MetadataExtractor;
31+
import org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition;
3732
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
38-
import org.springframework.util.Assert;
39-
import org.springframework.util.MimeType;
40-
import org.springframework.util.MimeTypeUtils;
4133
import org.springframework.util.ReflectionUtils;
42-
import org.springframework.util.StringUtils;
43-
44-
import io.rsocket.ConnectionSetupPayload;
45-
import io.rsocket.RSocket;
4634

4735
/**
4836
* The {@link RSocketMessageHandler} extension for Spring Integration needs.
@@ -60,63 +48,10 @@ class IntegrationRSocketMessageHandler extends RSocketMessageHandler {
6048
private static final Method HANDLE_MESSAGE_METHOD =
6149
ReflectionUtils.findMethod(ReactiveMessageHandler.class, "handleMessage", Message.class);
6250

63-
@Nullable
64-
private MimeType defaultDataMimeType;
65-
66-
private MimeType defaultMetadataMimeType = IntegrationRSocket.COMPOSITE_METADATA;
67-
68-
private MetadataExtractor metadataExtractor;
69-
7051
IntegrationRSocketMessageHandler() {
7152
setHandlerPredicate((clazz) -> false);
7253
}
7354

74-
/**
75-
* Configure the default content type to use for data payloads.
76-
* <p>By default this is not set. However a server acceptor will use the
77-
* content type from the {@link io.rsocket.ConnectionSetupPayload}, so this is typically
78-
* required for clients but can also be used on servers as a fallback.
79-
* @param defaultDataMimeType the MimeType to use
80-
*/
81-
@Override
82-
public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
83-
super.setDefaultDataMimeType(defaultDataMimeType);
84-
this.defaultDataMimeType = defaultDataMimeType;
85-
}
86-
87-
88-
/**
89-
* Configure the default {@code MimeType} for payload data if the
90-
* {@code SETUP} frame did not specify one.
91-
* <p>By default this is set to {@code "message/x.rsocket.composite-metadata.v0"}
92-
* @param mimeType the MimeType to use
93-
*/
94-
@Override
95-
public void setDefaultMetadataMimeType(MimeType mimeType) {
96-
super.setDefaultMetadataMimeType(mimeType);
97-
this.defaultMetadataMimeType = mimeType;
98-
}
99-
100-
/**
101-
* Configure a {@link MetadataExtractor} to extract the route and possibly
102-
* other metadata from the first payload of incoming requests.
103-
* <p>By default this is a {@link DefaultMetadataExtractor} with the
104-
* configured {@link RSocketStrategies} (and decoders), extracting a route
105-
* from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}
106-
* metadata entries.
107-
* @param extractor the extractor to use
108-
*/
109-
@Override
110-
public void setMetadataExtractor(MetadataExtractor extractor) {
111-
super.setMetadataExtractor(extractor);
112-
this.metadataExtractor = extractor;
113-
}
114-
115-
@Override
116-
public BiFunction<ConnectionSetupPayload, RSocket, RSocket> clientAcceptor() {
117-
return this::createRSocket;
118-
}
119-
12055
public boolean detectEndpoints() {
12156
ApplicationContext applicationContext = getApplicationContext();
12257
if (applicationContext != null && getHandlerMethods().isEmpty()) {
@@ -135,6 +70,7 @@ public boolean detectEndpoints() {
13570
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
13671
registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD,
13772
new CompositeMessageCondition(
73+
RSocketFrameTypeMessageCondition.REQUEST_CONDITION,
13874
new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher())));
13975
}
14076

@@ -143,36 +79,6 @@ protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers()
14379
return Collections.singletonList(new MessageHandlerMethodArgumentResolver());
14480
}
14581

146-
@Override
147-
public void afterPropertiesSet() {
148-
super.afterPropertiesSet();
149-
if (this.metadataExtractor == null) {
150-
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(getRSocketStrategies()); // NOSONAR
151-
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
152-
this.metadataExtractor = extractor;
153-
}
154-
}
155-
156-
protected IntegrationRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) {
157-
String mimeType = setupPayload.dataMimeType();
158-
MimeType dataMimeType =
159-
StringUtils.hasText(mimeType)
160-
? MimeTypeUtils.parseMimeType(mimeType)
161-
: this.defaultDataMimeType;
162-
Assert.notNull(dataMimeType, "No `dataMimeType` in ConnectionSetupPayload and no default value");
163-
mimeType = setupPayload.metadataMimeType();
164-
MimeType metaMimeType =
165-
StringUtils.hasText(mimeType)
166-
? MimeTypeUtils.parseMimeType(mimeType)
167-
: this.defaultMetadataMimeType;
168-
Assert.notNull(dataMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");
169-
RSocketStrategies rSocketStrategies = getRSocketStrategies();
170-
Assert.notNull(rSocketStrategies, "No `rSocketStrategies` provided");
171-
RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, metaMimeType, rSocketStrategies);
172-
return new IntegrationRSocket(this, getRouteMatcher(), requester, dataMimeType, metaMimeType,
173-
this.metadataExtractor, rSocketStrategies.dataBufferFactory());
174-
}
175-
17682
private static final class MessageHandlerMethodArgumentResolver implements SyncHandlerMethodArgumentResolver {
17783

17884
@Override

0 commit comments

Comments
 (0)