@@ -345,7 +345,74 @@ p.shutdown();
345
345
346
346
## 消费者分类
347
347
348
+ ### PushConsumer
348
349
350
+ 高度封装的消费者类型,消费消息仅仅通过消费监听器监听并返回结果。消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端SDK完成。
351
+
352
+ PushConsumer的消费监听器执行结果分为以下三种情况:
353
+
354
+ - 返回消费成功:以Java SDK为例,返回` ConsumeResult.SUCCESS ` ,表示该消息处理成功,服务端按照消费结果更新消费进度。
355
+ - 返回消费失败:以Java SDK为例,返回` ConsumeResult.FAILURE ` ,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。
356
+ - 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。
357
+
358
+ 具体实现可以参见这篇文章[ RocketMQ对pull和push的实现 ] ( http://devedmc.com/archives/1691854198138 ) 。
359
+
360
+ 使用PushConsumer消费者消费时,不允许使用以下方式处理消息,否则 RocketMQ 无法保证消息的可靠性。
361
+
362
+ - 错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,RocketMQ 服务端是无法感知的,因此不会进行消费重试。
363
+ - 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,RocketMQ 服务端同样无法感知,因此也不会进行消费重试。
364
+ - PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:
365
+ - 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。
366
+ - 无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。
367
+
368
+ ### SimpleConsumer
369
+
370
+ SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。
371
+
372
+ 一个来自官网的例子:
373
+
374
+ ``` java
375
+ // 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。
376
+ ClientServiceProvider provider = ClientServiceProvider . loadService();
377
+ String topic = " YourTopic" ;
378
+ FilterExpression filterExpression = new FilterExpression (" YourFilterTag" , FilterExpressionType . TAG );
379
+ SimpleConsumer simpleConsumer = provider. newSimpleConsumerBuilder()
380
+ // 设置消费者分组。
381
+ .setConsumerGroup(" YourConsumerGroup" )
382
+ // 设置接入点。
383
+ .setClientConfiguration(ClientConfiguration . newBuilder(). setEndpoints(" YourEndpoint" ). build())
384
+ // 设置预绑定的订阅关系。
385
+ .setSubscriptionExpressions(Collections . singletonMap(topic, filterExpression))
386
+ // 设置从服务端接受消息的最大等待时间
387
+ .setAwaitDuration(Duration . ofSeconds(1 ))
388
+ .build();
389
+ try {
390
+ // SimpleConsumer 需要主动获取消息,并处理。
391
+ List<MessageView > messageViewList = simpleConsumer. receive(10 , Duration . ofSeconds(30 ));
392
+ messageViewList. forEach(messageView - > {
393
+ System . out. println(messageView);
394
+ // 消费处理完成后,需要主动调用 ACK 提交消费结果。
395
+ try {
396
+ simpleConsumer. ack(messageView);
397
+ } catch (ClientException e) {
398
+ logger. error(" Failed to ack message, messageId={}" , messageView. getMessageId(), e);
399
+ }
400
+ });
401
+ } catch (ClientException e) {
402
+ // 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
403
+ logger. error(" Failed to receive message" , e);
404
+ }
405
+ ```
406
+
407
+ SimpleConsumer适用于以下场景:
408
+
409
+ - 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。
410
+ - 需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。
411
+ - 需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。
412
+
413
+ ### PullConsumer
414
+
415
+ 施工中。。。
349
416
350
417
## 消费者分组和生产者分组
351
418
0 commit comments