Skip to content

rocketmq

rocketmq如何保证消息不丢失

  1. 从Producer的视角来看:如果消息未能正确的存储在MQ中,或者消费者未能正确的消费到这条消息,都是消息丢失

  2. 从Broker的视角来看:如果消息已经存在Broker里面了,如何保证不会丢失呢(宕机、磁盘崩溃)

  3. 从Consumer的视角来看:如果消息已经完成持久化了,但是Consumer取了,但是未消费成功且没有反馈,就是消息丢失

  4. 从Producer分析:如何确保消息正确的发送到了Broker?

    1. 可以通过同步的方式阻塞式的发送,check SendStatus,状态超时或者失败,则会触发默认的2次重试
    2. 采取事务消息的投递方式,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的
  5. 从Broker分析:如果确保接收到的消息不会丢失?

    1. 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
    2. Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
    3. Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
  6. 从Cunmser分析:如何确保拉取到的消息被成功消费?

    1. Consumer自身维护一个持久化的offset,标记已经成功消费或者已经成功发回到broker的消息下标
    2. 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
    3. 如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地

消息重复

  1. 消费端处理消息的业务逻辑保持幂等性
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重

mq的事物:

消息发送到broker,此时comsumer不能消费,当producer处理完事物,consumer才可以消费

引出一个问题:

如果提交一半消息,本地事物执行完成,再次提交时候,mq集群整个挂了那后续的事物就完成不了 回复:借鉴另一个mq机制:qmq所采用的本地事物表

  1. 本地事物执行完成,会插入本地事物表,这个表存的是未发送的消息,或是未发送成功的消息
  2. 如果二次提交失败,会有一个自动扫描的工具,在库中找到这个消息,并继续调用不通过消息队列的方式去调用另一个服务

mq当broker增加的时候历史未消费消息如何分配

新增broker节点之后,会进行reblance,reblance主要操作是将原市所有的队列进行重新分配,但是历史消息不会重新分配;即:Broker 会尽可能地将队列均匀地分配到各个 Broker 节点上,但是这不会涉及历史消息的重新分配。

主从同步方式?

kafka

  1. broker:存储
  2. topic:消息分类
  3. partition:topic的分区,一个topic有多个partition,不同的partition分别部署在不同的broker上
    1. 这里提高消息的吞吐量,producer通过随机或者hash的方式将消息分发到不同的partition上
  4. offset:消息在日志上的位置,代表消息的唯一序号
  5. 其中consumer消费者必须属于一个group
  6. zk 元数据中心,用于存储broker、topic、partition 等 meta 数据; 另外,还负责 broker 故障发现,partition leader 选举,负载均衡等功能

partition

数据格式,par中的数据每一条都存在三个属性

  1. offset:数据的偏移量,相当于message的标志位
  2. messageSize:data的大小
  3. data:具体内容
  4. 物理上:通过由多个segment文件组成,每个segment数据文件是以该段最小的offset命名 + .log结尾;数据进行查找的时候通过二分查找的方式找到具体的segment

支持批量发送

producer可以在发送端一次在内存中合并多条消息,一次进行批量推送给broker,降低实时性的要求,提高吞吐量

压缩

produc:通过 GZIP 或 Snappy 格式对消息集合进行压缩 consumer:对数据解压 大数据场景下适用,因为大数据传输的瓶颈是在网络而不是cpu,cpu来用于数据的解压

时间轮的定时任务

Kafka 不删除已消费的消息对于 partition,顺序读写磁盘数据,以时间复杂度 O(1)方式提供消息持久化能力。

gpt对一个partation是否可以由多个consumer消费有异议 现在的理解:partation和queue都是只可以有consumer group中的某一个consumer消费。保证数据的顺序性;(此处还要看专业文档)

kafka的事物:

Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都失败 Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的 不是我们想的那种事务消息 流程:

  1. Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分
  2. 生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志,然后生产者再发送真正想要发送的消息,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息
  3. 发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了

消息可靠性

最多一次、恰好一次、最少一次 了基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次。

rocketmq推拉模式

  1. rocketmq的推模式其实就是拉模式,
  2. 客户端做场轮询,导致像推模式一样没有延迟
  3. 作为consumer 有着pollmessageService;不停的轮询向brocker做请求
  4. 作为broker: 有着pollmessageProcesser:收到请求后会比较当前commitlog的id与请求中offset的id。是否有偏移
    1. 若存在偏移则会将数据返回给consumer;
    2. 若不存在,会将请求置于list集合中,每5s钟进行轮训,查看偏移量与请求的参数比对
    3. 为了解决5s的时间。brocker有自己的线程ReputMessageService,这个线程会不断地从 commitLog 中解析数据并分发请求
    4. 最终broker都会调用notifyMessageArriving,将数据返回给consumer

kfk推拉模式

  1. 与rocketmq类似
  2. 收线consumer调用的是nio的reactor模式将请求注册到selector上等待相应,设置等待时间然后超时继续请求
  3. kafk长轮训,收到消息判断是否有消息,没有消息则用定时任务监听请求,时间轮的方式
  4. 然后当有数据到来,返回请求

总结: RocketMQ 和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返

基于 VitePress 构建