RocketMQ - 消息发送
概述
按照RocketMQ的说明,生产者能够发送的消息类型有以下五类:
- 普通消息
- 顺序消息
- 延迟消息
- 批量消息
- 事务消息
普通消息
普通消息即除去其他几个消息类型特性的消息。
消息发送的三种方式:
同步消息
同步消息:生产者发送一条消息给Broker,需要等待Broker返回响应,然后才会继续发送后续的消息。
假如消息发送失败,默认会重试三次,如果三次之后还是失败就需要记录异常日志进行处理。通过这种方式,保证发送消息一定被接收了,类似TCP的三次握手,一个请求对应一个ACK,以收到ACK作为发送成功的标志。
异步消息
异步消息:发送异步消息时,生产者不需要阻塞等待着上一条消息的返回,它可以紧接着发送后续的消息。
异步消息需要提供onSuccess和onException的两个方法,会有另外的线程来处理Broker的响应,同样异步消息也可以设置重试次数。
它跟同步消息的区别主要在于场景的应用,同步消息需要等待前一条消息的响应才能继续发后面的消息,而异步消息不需要等待。
因此,在对响应时间敏感的场景下,异步消息比较合适,因为生产者不需要等待消息的响应可以直接处理后续的消息发送。
单向消息
单向消息:生产者只管发送消息,至于Broker有没有收到消息,生产者不关心,不需要等待响应。
特点:不需要等待响应,发完就完事,发送的耗时很短,且不需要异步线程来等待Broker,这样一来系统能同时承载更多的消息发送,性能会比较好。
应用场景:消息可靠性要求不高:日志的收集,日志的量级很大,丢失几条消息没影响。
顺序消息
延迟消息
延迟消息:生产者发送了消息,但是并不想立马被消费者消费,希望延迟一段时间后才能被消费。
比如订单取消场景,一般下单后,如果15分钟没有支付,这笔订单就自动被取消。
这个场景可以在下单时候同时发送一个订单取消的延迟消息,时间是15分钟,这样15分钟后消费者就能收到这个消息,然后看看此时的订单有没有被支付,如果没有被支付,那么就执行订单取消的逻辑。
实际上延迟消息一开始不放在正常的Topic中,RocketMQ专门搞了个Topic名为SCHEDULE_TOPIC_XXX,将所有延迟消息都放在这个Topic下。
然后有个定时任务来扫描遍历消息的延迟时间到了没,如果到了,那么再把延迟消息发往它本身的Topic队列中。
这样就保证了延迟消息到时间之前,消费者不会消费到这个消息(因为消费者根本就没有订阅SCHEDULE_TOPIC_XXX),然后一到时间,消息就被投递到原来的Topic上,这样消费者就能消费到了。
这样的设计就复用了本身关于Topic、队列还有消费者消费消息的逻辑。
在RocketMQ中,延迟的时间是无法自定义的,是有固定的阶梯性限制,在发送消息时候,只能设定投递等级,不同等级固定对应一个延迟时间。
实现原理
Broker收到这个消息后,一看delayTimeLevel设置了值,那么就知道它是一个延迟消息,于是把消息的原Topic和对应队列ID保存在消息扩展属性里面。
这样一来延迟消息就存储好了。然后Broker起了一个定时线程池,里面一共有18个核心线程,这个线程池的任务就是定时调度 SCHEDULE_TOPIC_xxxx下的每个队列的消息,一旦有到期的消息,就分发到原Topic供消费者消费。
具体的做法是在初始时,每个队列都会对应被创建一个任务扔到线程池中,这些任务的内容就是根据传入的队列ID,得到对应的consumeQueue,当然还有对应的offset。
Broker会定时保存SCHEDULE_TOPIC_xxxx下consumeQueue的消费offset。得到consuemrQueue和offset对应的就能获取延迟消息,这时候将延迟时间跟当前时间对比,就能判断是否到期。
如果到期了,就从消息扩展属性里面获取原Topic和对应队列ID,然后投递到原队列中。
然后在代码上的实现是立马新建一个任务扔到线程池中,延迟时间是100ms,任务的入参会塞入更新后的offset,这样线程就会继续消费后面的消息,如此往复循环。
当然,如果拿到的对应延迟消息还未到时间,那么offset不变,也立马新建一个任务塞入到线程池中,这样100ms后又会来看这个消息是否到期。
可以看到,整个延迟消息设计就加了一个线程池,很巧妙地复用了正常消息的commitlog和comsumeQueue的存储机制,且利用发布订阅的特性,改变消息的Topic来使得消费者无法消费到未到时间的消息。
到时间了又投递回原Topic使得消费者可以消费到期的消息,非常nice!
存在的问题
从实现层面来看,大大减少了延迟消息开发的复杂度,但是这样的实现对延迟时间来说是不准的。
首先,同一个延迟level的消息都入同一个队列,然后上一个延迟消息处理完之后继续处理下一个,如果同一时刻有大量的同一个level的延迟消息产生,那么它们都堆积在一个队列里面,一个一个处理,这样一来即使后面的消息到时间了也得排队等着。
这样的机制就做不到非常实时。并且从SCHEDULE_TOPIC_xxxx分发至原Topic之后,假设原Topic本身就已经有很多消息堆积了,那么等消费者消费到这条消息的时候,时间也是有大大延迟的。
当然,本身在大流量下对时间的把控是无法做到很准确的,不论什么方式,都会有延迟,无非就是延迟精度多少的问题。可以考虑时间轮算法。
批量消息
批量消息:一次性打包发送多条消息,在对吞吐量敏感的场景,批量消息非常合适。
正常的消息是一条一条发,然后一条一条的等待响应。而批量消息是一批一批发,比如100条消息,本来需要调用100次发送接口,且需要等待100次响应。(思想:请求合并发送)
现在将这100条消息打包成1条消息发送,这样是不是仅需调用1次发送接口,且等待1次响应。
优点:提高吞吐量。 缺点:如果其中一条数据出错,可能需要一批重来了,处理起来也会比较麻烦。
事务消息
RocketMQ实现事务消息的原理:在事务开始时,发送一条半消息(half message)给Broker,所谓的半消息就是不完整的消息,这种消息不会被消费者消费到。
然后执行本地事务,在举例的场景就是下单的一系列操作。最后根据本地事务的执行结果来决定是向Broker发送提交消息,还是发送回滚消息。
- 如果发送提交消息,那么半消息就会变“完整",即可被消费者消息,最终消费者消费这条消息,整个分布式事务就完整了,保证了最终一致性。
- 如果发送回滚消息,那么这条半消息就废了,不会被消费者消费到,这就跟本地事务结果保持一致。
- RocketMQ还设计了一个反查机制:Broker会向发送的生产者来反查这个事务有没有执行成功,没有成功就返回回滚。
事务消息的整体流程:
事务消息使用跟延时消息用一样的套路。即发送半消息的的时候,发往的不是原先的Topic,而是将发往特定的Topic:RMQ_SYS_TRANS_HALF_TOPIC。
同样还是偷梁换柱,将原先的Topic和队列存储在属性里,替换Topic为RMQ_SYS_TRANS_HALF_TOPIC,队列默认为0。
这样一来消息被存储后也不会被消费者消息。
然后等待生产者的提交或者回滚事务的请求,如果收到提交,那么从属性中获取消息原先的Topic和队列,将消息发往原Topic即往commitlog里面存储这条消息,这样消费者就能消费到了。
如果是回滚,那么就不往commitlog里面存储,这样消费者就不会消费到,等同于事务回滚了。
并且Broker起了一个定时线程TransactionalMessageCheckService服务,它会定时的扫描RMQ_SYS_TRANS_HALF_TOPIC这个Topic下的消息,去请求生产者的反查接口看看事务成功了没,如果成功就恢复原先的Topic供消费者消费,失败的话就不重新投递。