消息队列
消息队列用途
解耦
MQ 可以使生产者和多个消费者之间解耦合,类似于设计模式里面的观察者模式,发布订阅机制。
异步
此时 MQ 类似一个缓存队列,支持生产者异步返回,后续的处理由下游的消费者处理。
削峰
此时 MQ 还是类似一个缓存队列,高峰期的数据可以缓存在消息队列里,上游仍然可以维持高吞吐量,具体耗时的工作由下游慢慢处理。
常用 MQ 对比
RocketMQ
出自阿里,java,分布式,10万级,ms级,功能完善,topc支持到几千个不影响吞吐量,风险是阿里可能不维护,需要自己定制开发升级。
kafka
出自 apache,scala,分布式,10万级,ms级,功能简单,topic支持到几百个时吞吐量下降,风险是会重复消费,对数据准确性有轻微影响,适用于日志采集。
kafka 概述
kafka 最初由 Linkedin 公司开发,之后成为 Apache 项目的一部分,是一个分布式的发布订阅消息系统。
主要特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
- 不完全可靠:为了高性能,降低了部分可靠性,在某些场景下,消息存在丢失和重复。
使用场景
- 基于 kafka 的特性,一般应用在日志收集,消息系统,流式处理等对吞吐量要求较高,但对可靠性要求较低的场景下。
- 与 kafka 相比,RabbitMQ/RocketMQ 更侧重于消息的可靠性,一般用于金融或电商订单业务。
消息存储
kafka 作为一个分布式存储系统,消息会以分区 (partition) 的形式保存在多个副本中,每个 broker 节点可以保存多个分区的多个副本。
消息格式
- 每个 partition 分区在 broker 上保存为一个文件目录,命名为 <topic_name>_<partition_id>。
- 每个 partition 目录下包含多个相同大小的 segment 文件,并以文件内首个消息的 offset 命名,扩展名为.log。
- segment文件内消息存储格式为: <message_size> ,每个 partition 分区的 offset 都是独立并递增的。
- 每个 segment 文件维护一个索引,扩展名.index,支持针对 offset 的二分查找。
消息删除
无论消息是否被消费,kafka 都会保存所有的消息。那对于旧数据有什么删除策略呢?
- 基于时间,默认配置是 168 小时(7 天)。
- 基于大小,默认配置是 1073741824。
- kafka 读取特定消息的时间复杂度是 O(1),所以这里删除过期的文件并不会提高 kafka 的性能,只是为了节省磁盘空间。
生产和消费
生产者写入
- 生产者通过 broker 集群获取当前分区对应的 leader,将消息通过 leader 写入分区消息队列,leader 将消息同步给其他副本。
- 写入时需要指定 topic,key 和 partition 可选,如果 partition 没有指定,则根据 key 做 hash 取模,如果 key 也没有设置,则轮询。
- 生产者可以配置为异步写入,业务线程将消息写入队列后返回,后台根据消息长度和最大等待时间执行批量发送,提高了吞吐量。
消息同步过程
- 生产者从 broker 集群中获取当前分区对应的 leader,并将消息发送给 leader。
- leader 负责将消息写入 log,并等待其他副本更新。
- 其他从 leader 同步消息,并写入 log,返回给 leader ack。
- leader 收到所有副本返回的 ack,判定消息写入成功,返回给生产者成功。
- 为提高吞吐量,默认配置为当 leader 写入成功,就返回成功,此时如果 leader 服务挂掉,会造成数据丢失。
消费者消费
- 由于 partition 分区只对应一个 consumer,超出的 consumer 节点不会分配到 partition,消费不到数据。
- consumer 从 broker 集群获取当前分区的 leader,批量 pull 消息,并提交消息的 offset,默认为 5 秒自动提交一次 offset。
offset
- kafka 集群在每个 partition 上为每个消费组维护一个已消费 offset,由消费者负责提交更新。
- 在历史版本中,offset 是维护在 zookeeper 中,新版本维护在 __consumer_offsets 这个 Topic 中。
rebalanced
rebalanced 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。
rebalanced 的触发条件主要有2个
- 组成员个数发生变化,增加组员或者减少组员。
- 订阅的 partition 分区数发生变化。
心跳监测
集群通过消费端的两个线程来监测状态,一个是心跳线程,一个是用户 poll 线程。
- 心跳线程定时向集群发送心跳包,心跳线程用于快速监测消费端的故障,尽早 rebalance。
- 用户 poll 线程从集群循环拉取消息,如果两次 poll 的时间间隔超时(默认300s),则认定消费端故障,执行 rebalance。
高可用设计
副本机制
每个 partition 会有多个副本和一个 leader,生产者和消费者只与 leader 交互,其它副本作为 follower 从 leader 复制数据。
zookeeper
- zookeeper 用来管理 broker 的动态加入和离开,实现故障发现和 leader 选举。
- 同时管理 consumer 的动态加入与离开,producer 不需要管理,随便一台计算机都可以作为 producer 向 kakfa broker发消息。
高吞吐设计
- Kafka基于页缓存计算+磁盘顺序写,实现了写入数据的超高性能。
- 基于零拷贝技术,提高了读取数据的性能。
不可靠特性
Kafka是为了高吞吐量设计的,在满足性能的前提下,不可避免的会带来一些不可靠问题。主要体现在消息丢失和重复消费上。
消息丢失问题可以通过修改配置,牺牲性能来解决,重复消费问题无法完全避免,如果业务不能容忍消息重复,需要自己实现幂等性。
消息丢失
生产者丢失
- 生产者采用定时批量发送数据,如果期间生产者进程挂掉,消息来不及发送出去,则消息丢失。
- 解决办法是减少消息发送的最大等待时间,比如可以配置为 5ms,从而减少消息丢失的数量和几率。
集群丢失
- kafka 默认是同步写入,只要 leader 写入成功就返回成功,此时如果 leader 挂掉,其他副本还没来得及同步消息,则消息丢失。
- 解决办法是配置为等待所有副本写入成功后,才返回成功,此时会降低写入的性能,影响吞吐量。
消费者丢失
消费者设置为自动提交时,如果消息被提交后,还没来得及处理,进程挂掉,此时消息丢失,解决办法是改为手动提交,牺牲性能。
重复消费
生产者重复
- 生产者发送完消息,因为网络问题没有收到 response,此时会重发消息,造成消息重复。
消费者重复
- 消费者设置为自动提交时,如果业务层消息处理时间太久,超过了 max.poll.interval.ms(默认300s),则判定消费端故障产生 rebalance,再次 poll 时仍获取到之前的消息,导致重复。解决办法是减少 max.poll.records(poll的消息个数),尽量保证消息处理的够快。
- 在自动提交模式下,只要集群产生 rebalance,已处理过但来不及提交的消息都会被再消费一次,导致重复。
同分区消息乱序
- 生产者发送消息时,如果前一个消息未响应,可以继续发送消息,如果前一个消息最终超时导致重发,则会出现消息乱序。
- 配置 max.in.flight.requests.per.connection:限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求,但吞吐量会下降。
问答
kafka 为什么快?
- partition 并行处理
- 顺序写磁盘,充分利用磁盘特性
- 利用了现代操作系统分页存储 Page Cache 来利用内存提高 I/O 效率
- 采用了零拷贝技术
- Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现顺序的快速写入
- Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer进行网络发送,减少 CPU 消耗
为什么Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息都是与 leader 副本进行交互,是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,主要原因是:
- 数据一致性问题:主节点向从节点同步消息比较耗时,导致消费的消息会滞后于主节点,产生不一致。
- 多分区机制:主从架构的目的是负载均衡,kafka本身支持多分区来实现负载均衡,主写主读实现更简单,没有必要采用主写从读。