Skip to content

Commit 79fdd55

Browse files
akuma8Attoumane Ahamadi
authored and
Attoumane Ahamadi
committed
Adding Channel Adapters for Redis Streams
1 parent 818f5a5 commit 79fdd55

File tree

2 files changed

+245
-0
lines changed

2 files changed

+245
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package org.springframework.integration.redis.inbound;
2+
3+
import java.time.Duration;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
8+
import org.springframework.data.redis.connection.RedisConnectionFactory;
9+
import org.springframework.data.redis.connection.stream.Consumer;
10+
import org.springframework.data.redis.connection.stream.ReadOffset;
11+
import org.springframework.data.redis.connection.stream.Record;
12+
import org.springframework.data.redis.connection.stream.StreamOffset;
13+
import org.springframework.data.redis.core.RedisCallback;
14+
import org.springframework.data.redis.core.RedisTemplate;
15+
import org.springframework.data.redis.hash.ObjectHashMapper;
16+
import org.springframework.data.redis.serializer.StringRedisSerializer;
17+
import org.springframework.data.redis.stream.StreamListener;
18+
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
19+
import org.springframework.integration.endpoint.MessageProducerSupport;
20+
import org.springframework.lang.Nullable;
21+
import org.springframework.messaging.Message;
22+
import org.springframework.messaging.MessageHeaders;
23+
import org.springframework.messaging.converter.MessageConverter;
24+
import org.springframework.messaging.converter.SimpleMessageConverter;
25+
import org.springframework.util.Assert;
26+
import org.springframework.util.StringUtils;
27+
28+
/**
29+
* Read Message from a Redis Stream and publish it to the indicated output channel.
30+
*/
31+
public class RedisStreamInboundChannelAdapter extends MessageProducerSupport {
32+
33+
private volatile StreamMessageListenerContainer container;
34+
35+
private volatile StreamMessageListenerContainer.StreamMessageListenerContainerOptions containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
36+
.builder()
37+
.serializer( StringRedisSerializer.UTF_8 )
38+
.pollTimeout( Duration.ZERO )
39+
.executor( new SimpleAsyncTaskExecutor() )
40+
.batchSize( 1 )
41+
.objectMapper( new ObjectHashMapper() )
42+
.build();
43+
44+
private final String streamKey;
45+
46+
private volatile @Nullable String consumerGroupName;
47+
48+
private volatile @Nullable String consumerName;
49+
50+
private volatile boolean createGroupeIfNotExist = false;
51+
52+
private volatile ReadOffset readOffset = ReadOffset.lastConsumed();
53+
54+
private final RedisTemplate<String, ?> template;
55+
56+
private final MessageConverter messageConverter = new SimpleMessageConverter();
57+
58+
private final RedisConnectionFactory connectionFactory;
59+
60+
public RedisStreamInboundChannelAdapter( String streamKey, RedisConnectionFactory connectionFactory ) {
61+
this.streamKey = streamKey;
62+
this.connectionFactory = connectionFactory;
63+
this.template = new RedisTemplate<>();
64+
this.template.setConnectionFactory( this.connectionFactory );
65+
this.template.setKeySerializer( new StringRedisSerializer() );
66+
this.template.afterPropertiesSet();
67+
}
68+
69+
@Override
70+
protected void onInit() {
71+
super.onInit();
72+
listenMessage();
73+
}
74+
75+
@Override
76+
protected void doStart() {
77+
super.doStart();
78+
createGroup();
79+
this.container.start();
80+
}
81+
82+
@Override
83+
protected void doStop() {
84+
super.doStop();
85+
this.container.stop();
86+
}
87+
88+
@Override
89+
public String getComponentType() {
90+
return "redis:stream-inbound-channel-adapter";
91+
}
92+
93+
private Message<?> convertMessage( Record<Object, Object> message ) {
94+
Map<String, Object> headers = new HashMap<>();
95+
headers.put( "streamKey", message.getStream() );
96+
headers.put( "Message-ID", message.getId() );
97+
return this.messageConverter.toMessage( message.getValue(), new MessageHeaders( headers ) );
98+
}
99+
100+
private void listenMessage() {
101+
this.container = StreamMessageListenerContainer.create( connectionFactory, this.containerOptions );
102+
103+
StreamListener<Object, Record<Object, Object>> streamListener = message -> sendMessage(
104+
convertMessage( message ) );
105+
106+
StreamOffset offset = StreamOffset.create( this.streamKey, this.readOffset );
107+
108+
if ( StringUtils.isEmpty( this.consumerGroupName ) ) {
109+
container.receive( offset, streamListener );
110+
} else {
111+
Assert.hasText( consumerName, "'consumerName' must be set" );
112+
Consumer consumer = Consumer.from( this.consumerGroupName, this.consumerName );
113+
container.receiveAutoAck( consumer, offset, streamListener );
114+
}
115+
}
116+
117+
// TODO : follow the resolution of this Spring Data Redis issue:
118+
// https://jira.spring.io/projects/DATAREDIS/issues/DATAREDIS-1119
119+
// And improve this method when it will be solved.
120+
/**
121+
* Create the Consumer Group if and only if it does not exist. During the
122+
* creation we can also create the stream ie {@code MKSTREAM}, that does not
123+
* have effect if the stream already exists.
124+
*/
125+
private void createGroup() {
126+
if ( createGroupeIfNotExist ) {
127+
Assert.hasText( consumerGroupName, "'consumerGroupName' must be set" );
128+
Assert.hasText( consumerName, "'consumerName' must be set" );
129+
try {
130+
template.execute( (RedisCallback<Object>) connection -> connection.execute( "XGROUP",
131+
"CREATE".getBytes(), streamKey.getBytes(), consumerGroupName.getBytes(),
132+
ReadOffset.latest().getOffset().getBytes(), "MKSTREAM".getBytes() ) );
133+
} catch ( Exception e ) {
134+
// An exception is thrown when the group already exists
135+
e.printStackTrace();
136+
}
137+
}
138+
}
139+
140+
public void setConsumerGroupName( @Nullable String consumerGroupName ) {
141+
this.consumerGroupName = consumerGroupName;
142+
}
143+
144+
public void setConsumerName( @Nullable String consumerName ) {
145+
this.consumerName = consumerName;
146+
}
147+
148+
public void setCreateGroupeIfNotExist( boolean createGroupeIfNotExist ) {
149+
this.createGroupeIfNotExist = createGroupeIfNotExist;
150+
}
151+
152+
public void setReadOffset( ReadOffset readOffset ) {
153+
this.readOffset = readOffset;
154+
}
155+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package org.springframework.integration.redis.outbound;
2+
3+
import org.springframework.data.redis.connection.RedisConnectionFactory;
4+
import org.springframework.data.redis.connection.stream.ObjectRecord;
5+
import org.springframework.data.redis.connection.stream.StreamRecords;
6+
import org.springframework.data.redis.core.RedisTemplate;
7+
import org.springframework.data.redis.core.StreamOperations;
8+
import org.springframework.data.redis.serializer.RedisSerializer;
9+
import org.springframework.data.redis.serializer.StringRedisSerializer;
10+
import org.springframework.expression.Expression;
11+
import org.springframework.expression.common.LiteralExpression;
12+
import org.springframework.integration.handler.AbstractMessageHandler;
13+
import org.springframework.messaging.Message;
14+
import org.springframework.util.Assert;
15+
16+
/**
17+
* Take a message from a Channel and publish it to the given Redis Stream
18+
*/
19+
public class RedisStreamMessageHandler extends AbstractMessageHandler {
20+
21+
private final StreamOperations streamOperations;
22+
23+
private volatile RedisSerializer<?> serializer = StringRedisSerializer.UTF_8;
24+
25+
private volatile Expression streamKeyExpression;
26+
27+
private volatile boolean extractPayload = true;
28+
29+
public RedisStreamMessageHandler( String streamKey, RedisConnectionFactory connectionFactory ) {
30+
Assert.notNull( streamKey, "'streamName' must not be null" );
31+
Assert.notNull( connectionFactory, "'connectionFactory' must not be null" );
32+
RedisTemplate<String, Object> template = new RedisTemplate<>();
33+
template.setConnectionFactory( connectionFactory );
34+
template.setValueSerializer( this.serializer );
35+
template.setHashValueSerializer( this.serializer );
36+
template.setKeySerializer( StringRedisSerializer.UTF_8 );
37+
template.setHashKeySerializer( StringRedisSerializer.UTF_8 );
38+
template.afterPropertiesSet();
39+
this.streamOperations = template.opsForStream();
40+
this.setStreamKey( streamKey );
41+
}
42+
43+
public void setSerializer( RedisSerializer<?> serializer ) {
44+
Assert.notNull( serializer, "'serializer' must not be null" );
45+
this.serializer = serializer;
46+
}
47+
48+
public void setStreamKey( String streamKey ) {
49+
Assert.hasText( streamKey, "'streamKey' must not be an empty string." );
50+
this.streamKeyExpression = new LiteralExpression( streamKey );
51+
}
52+
53+
public void setStreamKeyExpression( Expression streamKeyExpression ) {
54+
this.streamKeyExpression = streamKeyExpression;
55+
}
56+
57+
public void setExtractPayload( boolean extractPayload) {
58+
this.extractPayload = extractPayload;
59+
}
60+
61+
@Override
62+
public String getComponentType() {
63+
return "redis:stream-outbound-channel-adapter";
64+
}
65+
66+
@Override
67+
protected void onInit() {
68+
super.onInit();
69+
Assert.notNull( streamKeyExpression, "'streamKeyExpression' must not be null" );
70+
}
71+
72+
@Override
73+
protected void handleMessageInternal( Message<?> message ) {
74+
75+
String streamKey = this.streamKeyExpression.getExpressionString();
76+
77+
Object value = message;
78+
79+
if ( this.extractPayload) {
80+
value = message.getPayload();
81+
}
82+
83+
ObjectRecord<String, Object> record = StreamRecords
84+
.<String, Object> objectBacked( value )
85+
.withStreamKey( streamKey );
86+
87+
this.streamOperations.add( record );
88+
}
89+
90+
}

0 commit comments

Comments
 (0)