From 07c6e875daab6f6240b9c205998b64a111941644 Mon Sep 17 00:00:00 2001 From: lv jiang er hao <1755875003@qq.com> Date: Wed, 16 Aug 2023 09:23:02 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=E5=88=86?= =?UTF-8?q?=E7=BB=84=E5=92=8C=E7=94=9F=E4=BA=A7=E8=80=85=E5=88=86=E7=BB=84?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E7=9F=A5=E8=AF=86=E8=A1=A5=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message-queue/rocketmq-questions.md | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/docs/high-performance/message-queue/rocketmq-questions.md b/docs/high-performance/message-queue/rocketmq-questions.md index 5f2974b0cdd..cb76dd61291 100644 --- a/docs/high-performance/message-queue/rocketmq-questions.md +++ b/docs/high-performance/message-queue/rocketmq-questions.md @@ -196,6 +196,8 @@ tag: 你可以看到图中生产者组中的生产者会向主题发送消息,而 **主题中存在多个队列**,生产者每次生产消息之后是指定主题中的某个队列发送消息的。 + + 每个主题中都有多个队列(分布在不同的 `Broker`中,如果是集群的话,`Broker`又分布在不同的服务器中),集群消费模式下,一个消费者集群多台机器共同消费一个 `topic` 的多个队列,**一个队列只会被一个消费者消费**。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 `Consumer1` 和 `Consumer2` 分别对应着两个队列,而 `Consumer3` 是没有队列对应的,所以一般来讲要控制 **消费者组中的消费者个数和主题中队列个数相同** 。 当然也可以消费者个数小于队列个数,只不过不太建议。如下图。 @@ -270,6 +272,45 @@ tag: 第四、消费者通过 `NameServer` 获取所有 `Broker` 的路由信息后,向 `Broker` 发送 `Pull` 请求来获取消息数据。`Consumer` 可以以两种模式启动—— **广播(Broadcast)和集群(Cluster)**。广播模式下,一条消息会发送给 **同一个消费组中的所有消费者** ,集群模式下消息只会发送给一个消费者。 +## 关于发送消息 + +### **不建议单一进程创建大量生产者** + +Apache RocketMQ 的生产者和主题是多对多的关系,支持同一个生产者向多个主题发送消息。对于生产者的创建和初始化,建议遵循够用即可、最大化复用原则,如果有需要发送消息到多个主题的场景,无需为每个主题都创建一个生产者。 + +### **不建议频繁创建和销毁生产者** + +Apache RocketMQ 的生产者是可以重复利用的底层资源,类似数据库的连接池。因此不需要在每次发送消息时动态创建生产者,且在发送结束后销毁生产者。这样频繁的创建销毁会在服务端产生大量短连接请求,严重影响系统性能。 + +正确示例: + +```java +Producer p = ProducerBuilder.build(); +for (int i =0;i Date: Wed, 16 Aug 2023 10:38:00 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=86=E5=85=B3?= =?UTF-8?q?=E4=BA=8E=E6=B6=88=E6=81=AF=E7=B1=BB=E5=9E=8B=E7=9A=84=E4=BB=8B?= =?UTF-8?q?=E7=BB=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message-queue/rocketmq-questions.md | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/docs/high-performance/message-queue/rocketmq-questions.md b/docs/high-performance/message-queue/rocketmq-questions.md index cb76dd61291..faeff1decaa 100644 --- a/docs/high-performance/message-queue/rocketmq-questions.md +++ b/docs/high-performance/message-queue/rocketmq-questions.md @@ -272,6 +272,56 @@ tag: 第四、消费者通过 `NameServer` 获取所有 `Broker` 的路由信息后,向 `Broker` 发送 `Pull` 请求来获取消息数据。`Consumer` 可以以两种模式启动—— **广播(Broadcast)和集群(Cluster)**。广播模式下,一条消息会发送给 **同一个消费组中的所有消费者** ,集群模式下消息只会发送给一个消费者。 +## RocketMQ功能特性 + +### 消息 + +#### 普通消息 + +普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。另外还有日志系统,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 RocketMQ 。 + +![](https://rocketmq.apache.org/zh/assets/images/lifecyclefornormal-e8a2a7e42a0722f681eb129b51e1bd66.png) + +**普通消息生命周期** + +- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。 +- 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。 +- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。 +- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。 +- 消息删除:RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。 + +#### 定时消息 + +在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。 + +基于定时消息的超时任务处理具备如下优势: + +- **精度高、开发门槛低**:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。 +- **高性能可扩展**:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。RocketMQ 的定时消息具有高并发和水平扩展的能力。 + +![](https://rocketmq.apache.org/zh/assets/images/lifecyclefordelay-2ce8278df69cd026dd11ffd27ab09a17.png) + +**定时消息生命周期** + +- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。 +- 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息**单独存储在定时存储系统中**,等待定时时刻到达。 +- 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。 +- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。 +- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。 +- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。 + +定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。 + +#### 顺序消息 + +顺序消息仅支持使用MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。和普通消息发送相比,顺序消息发送必须要设置消息组。(推荐实现MessageQueueSelector的方式,见下文)。要保证消息的顺序性需要单一生产者串行发送。 + +单线程使用MessageListenerConcurrently可以顺序消费,多线程环境下使用MessageListenerOrderly才能顺序消费。 + +#### 事务消息 + +施工中。。。 + ## 关于发送消息 ### **不建议单一进程创建大量生产者** @@ -293,6 +343,10 @@ for (int i =0;i Date: Wed, 16 Aug 2023 11:03:42 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=80=85=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E7=9A=84=E4=BB=8B=E7=BB=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message-queue/rocketmq-questions.md | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/docs/high-performance/message-queue/rocketmq-questions.md b/docs/high-performance/message-queue/rocketmq-questions.md index faeff1decaa..c9c2347a472 100644 --- a/docs/high-performance/message-queue/rocketmq-questions.md +++ b/docs/high-performance/message-queue/rocketmq-questions.md @@ -345,7 +345,74 @@ p.shutdown(); ## 消费者分类 +### PushConsumer +高度封装的消费者类型,消费消息仅仅通过消费监听器监听并返回结果。消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端SDK完成。 + +PushConsumer的消费监听器执行结果分为以下三种情况: + +- 返回消费成功:以Java SDK为例,返回`ConsumeResult.SUCCESS`,表示该消息处理成功,服务端按照消费结果更新消费进度。 +- 返回消费失败:以Java SDK为例,返回`ConsumeResult.FAILURE`,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。 +- 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。 + +具体实现可以参见这篇文章[RocketMQ对pull和push的实现 ](http://devedmc.com/archives/1691854198138)。 + +使用PushConsumer消费者消费时,不允许使用以下方式处理消息,否则 RocketMQ 无法保证消息的可靠性。 + +- 错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,RocketMQ 服务端是无法感知的,因此不会进行消费重试。 +- 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,RocketMQ 服务端同样无法感知,因此也不会进行消费重试。 +- PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景: + - 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。 + - 无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。 + +### SimpleConsumer + +SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。 + +一个来自官网的例子: + +```java +// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。 +ClientServiceProvider provider = ClientServiceProvider.loadService(); +String topic = "YourTopic"; +FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG); +SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder() + // 设置消费者分组。 + .setConsumerGroup("YourConsumerGroup") + // 设置接入点。 + .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build()) + // 设置预绑定的订阅关系。 + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + // 设置从服务端接受消息的最大等待时间 + .setAwaitDuration(Duration.ofSeconds(1)) + .build(); +try { + // SimpleConsumer 需要主动获取消息,并处理。 + List messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); + messageViewList.forEach(messageView -> { + System.out.println(messageView); + // 消费处理完成后,需要主动调用 ACK 提交消费结果。 + try { + simpleConsumer.ack(messageView); + } catch (ClientException e) { + logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e); + } + }); +} catch (ClientException e) { + // 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 + logger.error("Failed to receive message", e); +} +``` + +SimpleConsumer适用于以下场景: + +- 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。 +- 需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。 +- 需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。 + +### PullConsumer + +施工中。。。 ## 消费者分组和生产者分组