RocketMQ - 顺序消息

概述

顺序消息:生产者按照顺序发送消息,并且消费者按照发送的顺序来消费消息。

典型例子:订单的场景从 创建订单 —— 支付 —— 发货 —— 完结订单。

从实际应用来看,一个Topic是分多个队列的,每个队列都有相应的消费者进行消费。

全局顺序消息:如果把创建订单、支付、发货、完结订单,这几类消息全都发往一个队列。

分区顺序消息:如果把同一笔订单的创建、支付、发货、完结发往一个队列,不同的订单可以发往不同队列。

//分区顺序消息实现:根据sharding key(orderId)来选择队列
int queueIndex = orderId % queue.size();
producer.send(msg, queueIndex);
1
2
3

采用分区顺序消息比较好,能够提高并发度,加快消息的消费速率。

实现原理

发消息的顺序性

首先需要保证是单个生产者来发送顺序消息

现在生产环境基本上都是多服务部署,如果有多个生产者分布在不同的服务中,都往同一个Topic发送相关的顺序消息,那么压根无法保证消息的顺序性。

即使在因果上他们产生的顺序是对的,但是最终发送到Broker这个过程的顺序是无法把控的(可能产生消息的时间早,但是实际发送的时间晚,还有网络上的传输顺序也是不可预测的)。

保证单个生产者后,还需保证单个生产者内对顺序消息是单线程(串行)发送的

RocketMQ的生产者是支持多线程发送消息的(线程安全),因此在使用上如果我们利用多线程提高发送顺序消息的速率,理论上就无法保证绝对的顺序。

比如消息-1和消息-2都发往顺序Topic-A,消息-1比消息-2在顺序上领先,它也的确比消息-2更早产生,消息-1也更早的被线程-A处理,随后消息-2才产生且被线程-B处理。

即使在这种情况下,都无法保证消息-1一定比消息-2早发送到Broker上,因为线程会被调度,可能线程-A执行一半就停了,线程-B还在执行,这样一来先启动的并不一定先执行完成,这就是多线程的不确定性。

也就是多线程是无法保证顺序的,因此在发送时候,需要单线程串行发送有关的顺序消息

所以对生产者来说,发送顺序消息需要保证两点**:单个生产者和串行发送**。

存储消息的顺序性

在存储上我们知道消息是按照时间顺序追加写入到commitlog中的,且会被分发到consumeQueue中。

同消费组内,一个consumeQueue仅会被一个消费者消费,且这个消费者会按照consumeQueue内存储的顺序来消费消息,因此我们仅需让相关的顺序消息都分配到同一个consumeQueue即可,这样存储上就是有序的。

同一笔订单相关的创建、支付、发货都发往同一个队列即可:

那如何让相关的顺序消息投递到同一个consumeQueue中?

发送顺序消息的时候指定队列行了

SendResult sendResult = producer.send(msg, newMessageQueueSelector(){
@Override
public MessageQueue select(List<MessageQueue> mqs,Messagem sg,Object arg)
{
    Integer id = (Integer)arg; //订单号
    int index = id % mqs.size(); //对队列取余
    return mqs.get(index);  //选择队列
}
} ,orderId);
1
2
3
4
5
6
7
8
9

利用MessageQueueSelector来选择队列,select方法里面的mqs就是Topic下所有队列,通过orderId取余队列数使得一样的订单都会被发往相同的队列,这样就保证分区顺序消息。

如果我们要保证所有订单处理的顺序,那么直接写死选择一个队列即可,也就是让所有的订单相关消息都发往同一个队列,这样叫全局顺序消息。

全局顺序消息的性能比较低,因为只能发往一个队列,这个并发度已经限制死了。因为在集群模式下,同一消费组内,一个队列只能对应有一个消费者来消费

而分区顺序消息的并发度取决于Topic下的队列数,因此如果想提升性能仅需增加队列数即可,不同队列之间可以并发处理顺序消息,互不影响,性能较好。

一般在业务上,我们很少用到全局顺序消息,一般而言分区顺序消息就够用了。

消费消息的顺序性

首先消费者需要保证单线程消费顺序消息,如果消费者是多线程消费消息,那么道理同生产者多线程发送消息一样,无法保证消息的顺序性。

并且还需要考虑异常的情况,也就是消费失败场景的处理。正常消息如果消费失败会进行重试,重试16次后会进入死信队列,后续人工介入 处理,不会堵着后面的消息。

而对于顺序消息来说,如果前置的消息消费失败了,后续的消息能正常消费吗?也就是说重试多次后,能直接进入死信队列吗?跳过前置消息,后面的消息能正常消费吗?很多情况下前面的消息消费失败,后续的消息是无法正常消费的,如果在逻辑上没有做处理很容易造成脏数据

因此顺序消息的处理在业务上可能需要支持相关联的消息都直接失败,然后可以在另外的地方持久化保存这些消息,待后续修复后可以重新消费。

也就是不抛错,先记着不处理,不堵着后续无关的消息,后面可以人工介入或者其他方式去处理这些失败的消息。

反正在顺序消息场景下因为重平衡机制,导致两个消费者消费同一个队列的消息,不仅导致重复消费,也可能使得顺序不一致。因此需要一定的机制来避免这种情况的发生。

在消费场景,RocketMQ利用三把锁来尽可能地保证消息的消费顺序性。很巧的是这三把锁,覆盖了我们常见的三种锁:分布式锁、Synchronized、ReentrantLock。

分布式锁:保证了一个队列只会被分配给一个消费者 本地锁:保证了同一时刻只有一个线程去消费这个队列。 ReentrantLock锁(可重入的):这把锁更像一个标记位,来表明当前这个队列还有消息在消费,无法重平衡,等待下一次重平衡。