1
1
/*
2
- * Copyright 2002-2020 the original author or authors.
2
+ * Copyright 2002-2021 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
17
17
package org .springframework .integration .mqtt ;
18
18
19
19
import static org .assertj .core .api .Assertions .assertThat ;
20
+ import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
20
21
import static org .assertj .core .api .Assertions .fail ;
21
22
import static org .mockito .Mockito .mock ;
22
23
23
- import java .util . Arrays ;
24
+ import java .io . File ;
24
25
import java .util .Collections ;
25
26
import java .util .concurrent .CountDownLatch ;
26
27
import java .util .concurrent .TimeUnit ;
27
28
28
29
import org .eclipse .paho .client .mqttv3 .MqttClientPersistence ;
29
30
import org .eclipse .paho .client .mqttv3 .persist .MqttDefaultFilePersistence ;
30
- import org .junit .ClassRule ;
31
- import org .junit .Test ;
32
- import org .junit .rules . TemporaryFolder ;
33
- import org .junit .runner . RunWith ;
31
+ import org .junit .jupiter . api . AfterAll ;
32
+ import org .junit .jupiter . api . BeforeAll ;
33
+ import org .junit .jupiter . api . Test ;
34
+ import org .junit .jupiter . api . io . TempDir ;
34
35
35
36
import org .springframework .beans .factory .BeanFactory ;
36
37
import org .springframework .beans .factory .annotation .Autowired ;
50
51
import org .springframework .integration .support .MessageBuilder ;
51
52
import org .springframework .integration .support .json .EmbeddedJsonHeadersMessageMapper ;
52
53
import org .springframework .integration .support .json .JacksonJsonUtils ;
54
+ import org .springframework .integration .test .condition .LongRunningTest ;
53
55
import org .springframework .messaging .Message ;
54
56
import org .springframework .messaging .MessageChannel ;
55
57
import org .springframework .messaging .MessagingException ;
56
58
import org .springframework .messaging .PollableChannel ;
57
59
import org .springframework .messaging .support .GenericMessage ;
58
60
import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
59
61
import org .springframework .test .annotation .DirtiesContext ;
60
- import org .springframework .test .context .junit4 . SpringRunner ;
62
+ import org .springframework .test .context .junit . jupiter . SpringJUnitConfig ;
61
63
62
64
/**
63
65
* @author Gary Russell
66
68
* @since 4.0
67
69
*
68
70
*/
69
- @ RunWith (SpringRunner .class )
71
+ @ LongRunningTest
72
+ @ SpringJUnitConfig
70
73
@ DirtiesContext
71
- public class BackToBackAdapterTests {
74
+ public class BackToBackAdapterTests implements MosquittoContainerTest {
72
75
73
- @ ClassRule
74
- public static final BrokerRunning brokerRunning = BrokerRunning . isRunning ( 1883 ) ;
76
+ @ TempDir
77
+ static File folder ;
75
78
76
- @ ClassRule
77
- public static final TemporaryFolder folder = new TemporaryFolder ();
79
+ static ThreadPoolTaskScheduler taskScheduler ;
78
80
79
81
@ Autowired
80
82
private MessageChannel out ;
@@ -85,31 +87,40 @@ public class BackToBackAdapterTests {
85
87
@ Autowired
86
88
private EventsListener listener ;
87
89
90
+ @ BeforeAll
91
+ static void setup () {
92
+ taskScheduler = new ThreadPoolTaskScheduler ();
93
+ taskScheduler .initialize ();
94
+ }
95
+
96
+ @ AfterAll
97
+ static void teardown () {
98
+ taskScheduler .destroy ();
99
+ }
100
+
88
101
@ Test
89
102
public void testSingleTopic () {
90
- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
103
+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
91
104
adapter .setDefaultTopic ("mqtt-foo" );
92
105
adapter .setBeanFactory (mock (BeanFactory .class ));
93
106
adapter .afterPropertiesSet ();
94
107
adapter .start ();
95
- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" ,
108
+ MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest . mqttUrl () ,
96
109
"si-test-in" , "mqtt-foo" );
97
110
QueueChannel outputChannel = new QueueChannel ();
98
111
inbound .setOutputChannel (outputChannel );
99
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
100
- taskScheduler .initialize ();
101
112
inbound .setTaskScheduler (taskScheduler );
102
113
inbound .setBeanFactory (mock (BeanFactory .class ));
103
114
inbound .afterPropertiesSet ();
104
115
inbound .start ();
105
- adapter .handleMessage (new GenericMessage <String >("foo" ));
116
+ adapter .handleMessage (new GenericMessage <>("foo" ));
106
117
Message <?> out = outputChannel .receive (20000 );
107
118
assertThat (out ).isNotNull ();
108
119
adapter .stop ();
109
120
inbound .stop ();
110
121
assertThat (out .getPayload ()).isEqualTo ("foo" );
111
122
assertThat (out .getHeaders ().get (MqttHeaders .RECEIVED_TOPIC )).isEqualTo ("mqtt-foo" );
112
- assertThat (adapter .getConnectionInfo ().getServerURIs ()[0 ]).isEqualTo ("tcp://localhost:1883" );
123
+ assertThat (adapter .getConnectionInfo ().getServerURIs ()[0 ]).isEqualTo (MosquittoContainerTest . mqttUrl () );
113
124
}
114
125
115
126
@ Test
@@ -123,7 +134,7 @@ public void testJsonNoTrust() {
123
134
}
124
135
125
136
private void testJsonCommon (String ... trusted ) {
126
- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
137
+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
127
138
adapter .setDefaultTopic ("mqtt-foo" );
128
139
adapter .setBeanFactory (mock (BeanFactory .class ));
129
140
EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper (
@@ -133,18 +144,16 @@ private void testJsonCommon(String... trusted) {
133
144
adapter .setConverter (converter );
134
145
adapter .afterPropertiesSet ();
135
146
adapter .start ();
136
- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ( "tcp://localhost:1883" ,
137
- "si-test-in" , "mqtt-foo" );
147
+ MqttPahoMessageDrivenChannelAdapter inbound =
148
+ new MqttPahoMessageDrivenChannelAdapter ( MosquittoContainerTest . mqttUrl (), "si-test-in" , "mqtt-foo" );
138
149
QueueChannel outputChannel = new QueueChannel ();
139
150
inbound .setOutputChannel (outputChannel );
140
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
141
- taskScheduler .initialize ();
142
151
inbound .setTaskScheduler (taskScheduler );
143
152
inbound .setBeanFactory (mock (BeanFactory .class ));
144
153
inbound .setConverter (converter );
145
154
inbound .afterPropertiesSet ();
146
155
inbound .start ();
147
- adapter .handleMessage (new GenericMessage <Foo >(new Foo ("bar" ), Collections .singletonMap ("baz" , "qux" )));
156
+ adapter .handleMessage (new GenericMessage <>(new Foo ("bar" ), Collections .singletonMap ("baz" , "qux" )));
148
157
Message <?> out = outputChannel .receive (20000 );
149
158
assertThat (out ).isNotNull ();
150
159
adapter .stop ();
@@ -161,22 +170,21 @@ private void testJsonCommon(String... trusted) {
161
170
162
171
@ Test
163
172
public void testAddRemoveTopic () {
164
- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
173
+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
165
174
adapter .setDefaultTopic ("mqtt-foo" );
166
175
adapter .setBeanFactory (mock (BeanFactory .class ));
167
176
adapter .afterPropertiesSet ();
168
177
adapter .start ();
169
- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" , "si-test-in" );
178
+ MqttPahoMessageDrivenChannelAdapter inbound =
179
+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest .mqttUrl (), "si-test-in" );
170
180
QueueChannel outputChannel = new QueueChannel ();
171
181
inbound .setOutputChannel (outputChannel );
172
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
173
- taskScheduler .initialize ();
174
182
inbound .setTaskScheduler (taskScheduler );
175
183
inbound .setBeanFactory (mock (BeanFactory .class ));
176
184
inbound .afterPropertiesSet ();
177
185
inbound .start ();
178
186
inbound .addTopic ("mqtt-foo" );
179
- adapter .handleMessage (new GenericMessage <String >("foo" ));
187
+ adapter .handleMessage (new GenericMessage <>("foo" ));
180
188
Message <?> out = outputChannel .receive (20_000 );
181
189
assertThat (out ).isNotNull ();
182
190
assertThat (out .getPayload ()).isEqualTo ("foo" );
@@ -194,17 +202,13 @@ public void testAddRemoveTopic() {
194
202
out = outputChannel .receive (1 );
195
203
assertThat (out ).isNull ();
196
204
197
- try {
198
- inbound .addTopic ("mqtt-foo" );
199
- fail ("Expected exception" );
200
- }
201
- catch (MessagingException e ) {
202
- assertThat (e .getMessage ()).isEqualTo ("Topic 'mqtt-foo' is already subscribed." );
203
- }
205
+ assertThatExceptionOfType (MessagingException .class )
206
+ .isThrownBy (() -> inbound .addTopic ("mqtt-foo" ))
207
+ .withMessage ("Topic 'mqtt-foo' is already subscribed." );
204
208
205
209
inbound .addTopic ("mqqt-bar" , "mqqt-baz" );
206
210
inbound .removeTopic ("mqqt-bar" , "mqqt-baz" );
207
- inbound .addTopics (new String [] { "mqqt-bar" , "mqqt-baz" }, new int [] { 0 , 0 });
211
+ inbound .addTopics (new String []{ "mqqt-bar" , "mqqt-baz" }, new int []{ 0 , 0 });
208
212
inbound .removeTopic ("mqqt-bar" , "mqqt-baz" );
209
213
210
214
adapter .stop ();
@@ -213,22 +217,21 @@ public void testAddRemoveTopic() {
213
217
214
218
@ Test
215
219
public void testTwoTopics () {
216
- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
220
+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
217
221
adapter .setDefaultTopic ("mqtt-foo" );
218
222
adapter .setBeanFactory (mock (BeanFactory .class ));
219
223
adapter .afterPropertiesSet ();
220
224
adapter .start ();
221
- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" ,
222
- "si-test-in" , "mqtt-foo" , "mqtt-bar" );
225
+ MqttPahoMessageDrivenChannelAdapter inbound =
226
+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest .mqttUrl (),
227
+ "si-test-in" , "mqtt-foo" , "mqtt-bar" );
223
228
QueueChannel outputChannel = new QueueChannel ();
224
229
inbound .setOutputChannel (outputChannel );
225
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
226
- taskScheduler .initialize ();
227
230
inbound .setTaskScheduler (taskScheduler );
228
231
inbound .setBeanFactory (mock (BeanFactory .class ));
229
232
inbound .afterPropertiesSet ();
230
233
inbound .start ();
231
- adapter .handleMessage (new GenericMessage <String >("foo" ));
234
+ adapter .handleMessage (new GenericMessage <>("foo" ));
232
235
Message <?> message = MessageBuilder .withPayload ("bar" ).setHeader (MqttHeaders .TOPIC , "mqtt-bar" ).build ();
233
236
adapter .handleMessage (message );
234
237
Message <?> out = outputChannel .receive (20000 );
@@ -240,12 +243,13 @@ public void testTwoTopics() {
240
243
inbound .stop ();
241
244
assertThat (out .getPayload ()).isEqualTo ("bar" );
242
245
assertThat (out .getHeaders ().get (MqttHeaders .RECEIVED_TOPIC )).isEqualTo ("mqtt-bar" );
246
+
243
247
adapter .stop ();
244
248
}
245
249
246
250
@ Test
247
251
public void testAsync () throws Exception {
248
- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
252
+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
249
253
adapter .setDefaultTopic ("mqtt-foo" );
250
254
adapter .setBeanFactory (mock (BeanFactory .class ));
251
255
adapter .setAsync (true );
@@ -255,33 +259,32 @@ public void testAsync() throws Exception {
255
259
adapter .afterPropertiesSet ();
256
260
adapter .start ();
257
261
MqttPahoMessageDrivenChannelAdapter inbound =
258
- new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" , "si-test-in" , "mqtt-foo" );
262
+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest . mqttUrl () , "si-test-in" , "mqtt-foo" );
259
263
QueueChannel outputChannel = new QueueChannel ();
260
264
inbound .setOutputChannel (outputChannel );
261
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
262
- taskScheduler .initialize ();
263
265
inbound .setTaskScheduler (taskScheduler );
264
266
inbound .setBeanFactory (mock (BeanFactory .class ));
265
267
inbound .afterPropertiesSet ();
266
268
inbound .start ();
267
- GenericMessage <String > message = new GenericMessage <String >("foo" );
269
+ GenericMessage <String > message = new GenericMessage <>("foo" );
268
270
adapter .handleMessage (message );
269
271
verifyEvents (adapter , publisher , message );
270
272
Message <?> out = outputChannel .receive (20000 );
271
273
assertThat (out ).isNotNull ();
272
274
adapter .stop ();
273
275
inbound .stop ();
276
+
274
277
assertThat (out .getPayload ()).isEqualTo ("foo" );
275
278
assertThat (out .getHeaders ().get (MqttHeaders .RECEIVED_TOPIC )).isEqualTo ("mqtt-foo" );
276
279
}
277
280
278
281
@ Test
279
282
public void testAsyncPersisted () throws Exception {
280
283
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory ();
281
- MqttClientPersistence persistence = new MqttDefaultFilePersistence (folder .getRoot (). getAbsolutePath ());
284
+ MqttClientPersistence persistence = new MqttDefaultFilePersistence (folder .getAbsolutePath ());
282
285
factory .setPersistence (persistence );
283
- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ( "tcp://localhost:1883" , "si-test-out" ,
284
- factory );
286
+ MqttPahoMessageHandler adapter =
287
+ new MqttPahoMessageHandler ( MosquittoContainerTest . mqttUrl (), "si-test-out" , factory );
285
288
adapter .setDefaultTopic ("mqtt-foo" );
286
289
adapter .setBeanFactory (mock (BeanFactory .class ));
287
290
adapter .setAsync (true );
@@ -293,16 +296,15 @@ public void testAsyncPersisted() throws Exception {
293
296
adapter .start ();
294
297
295
298
MqttPahoMessageDrivenChannelAdapter inbound =
296
- new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" , "si-test-in" , "mqtt-foo" , "mqtt-bar" );
299
+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest .mqttUrl (),
300
+ "si-test-in" , "mqtt-foo" , "mqtt-bar" );
297
301
QueueChannel outputChannel = new QueueChannel ();
298
302
inbound .setOutputChannel (outputChannel );
299
- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
300
- taskScheduler .initialize ();
301
303
inbound .setTaskScheduler (taskScheduler );
302
304
inbound .setBeanFactory (mock (BeanFactory .class ));
303
305
inbound .afterPropertiesSet ();
304
306
inbound .start ();
305
- Message <String > message1 = new GenericMessage <String >("foo" );
307
+ Message <String > message1 = new GenericMessage <>("foo" );
306
308
adapter .handleMessage (message1 );
307
309
verifyEvents (adapter , publisher1 , message1 );
308
310
@@ -334,7 +336,7 @@ public void testAsyncPersisted() throws Exception {
334
336
335
337
assertThat (publisher1 .delivered .getClientInstance ()).isNotEqualTo (clientInstance );
336
338
337
- Message <?> out = null ;
339
+ Message <?> out ;
338
340
for (int i = 0 ; i < 4 ; i ++) {
339
341
out = outputChannel .receive (20000 );
340
342
assertThat (out ).isNotNull ();
@@ -354,6 +356,7 @@ else if ("bar".equals(out.getPayload())) {
354
356
355
357
private void verifyEvents (MqttPahoMessageHandler adapter , EventPublisher publisher1 , Message <String > message1 )
356
358
throws InterruptedException {
359
+
357
360
assertThat (publisher1 .latch .await (10 , TimeUnit .SECONDS )).isTrue ();
358
361
assertThat (publisher1 .sent ).isNotNull ();
359
362
assertThat (publisher1 .delivered ).isNotNull ();
@@ -373,13 +376,13 @@ private void verifyMessageIds(EventPublisher publisher1, EventPublisher publishe
373
376
374
377
@ Test
375
378
public void testMultiURIs () {
376
- out .send (new GenericMessage <String >("foo" ));
379
+ out .send (new GenericMessage <>("foo" ));
377
380
Message <?> message = in .receive (20000 );
378
381
assertThat (message ).isNotNull ();
379
382
assertThat (message .getPayload ()).isEqualTo ("foo" );
380
383
MqttPahoComponent source = this .listener .event .getSourceAsType ();
381
- assertThat (Arrays . toString ( source .getConnectionInfo ().getServerURIs () ))
382
- .isEqualTo ( "[tcp://localhost:1883, tcp://localhost:1883]" );
384
+ assertThat (source .getConnectionInfo ().getServerURIs ())
385
+ .contains ( MosquittoContainerTest . mqttUrl (), MosquittoContainerTest . mqttUrl () );
383
386
}
384
387
385
388
public static class EventsListener implements ApplicationListener <MqttSubscribedEvent > {
0 commit comments