kafka知识点整理
kafka和RabbitMq的对比
存储上:
- kafka支持了分布式存储,也就是分区。相当于把原来rabbitMq 的queue拆分成了很多个小队列,分布式存储在不同的服务上,提高了消息的发送的效率(一次可以发到多个分区),提高了消费效率,不同分区,使用不同的消费者,提高了并发处理能力。
应用场景上:
- RabbitMQ,遵循AMQP协议,用在实时的对可靠性要求比较高的消息传递上。
- kafka它主要用于处理活跃的流式数据,大数据量的数据处理上。
吞吐量上:
- rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作
- kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。
在可用性方面:
- rabbitMQ支持miror的queue,主queue失效,miror queue接管。
- kafka的broker支持主备模式。
在集群负载均衡方面:
- rabbitMQ的负载均衡需要单独的loadbalancer进行支持。
- kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。
消息的删除时间上:
- rabbitMQ 消费完了就删除。
- kafka一般不会删除消息,不管这些消息有没有被消费。只会根据配置的日志保留时间(log.retention.hours)确认消息多久被删除,默认保留最近一周的日志消息。这些日志可以被重复读取和无限期保留。
为什么要对Topic下数据进行分区存储?
- commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上,相当于对
数据做了分布式存储,理论上一个topic可以处理任意数量的数据。
2、为了提高并行度,consumer可以并行去处理不同分区拿到的数据。
怎么理解Topic,Partition和Broker ?
一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic。
订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在 topic内部划分多个partition来分片存储数据。
不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker。
kafka的消费者是**pull(拉)还是push(推)**模式,这种模式有什么好处?
Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。
优点:pull模式消费者自主决定是否批量从broker拉取数据,而push模式在无法知道消费者消费能力情况下,不易控制推送速度,太快可能造成消费者奔溃,太慢又可能造成浪费。
缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到到达。为了避免这点,Kafka 有个参数可以让 consumer阻塞直到新消息到达,当然也可以阻塞直到消息的数量达到某个特定的量这样就可以批量发送。
集群消费
- leader处理所有的针对这个partition的读写请求。
- 而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。
消费顺序怎么保证?
方案一:
发送端:发送消息的时候,往一个partition 中去发。
如果发送端配置了重试机制,就可能出现发送方发送时是1,2,3,但1发送失败,重试发送1,这样收到的消息就是2,3,1。这种情况下,需要同步的去发消息,只有第一个消息发送成功了,再去发送2,3。
消费端:取消息的时候也从一个partition 中去取。一个partition同一个时刻,在一个consumer group里只能有一个consumer去消费,从而保证消费顺序。
方案二:
- 发送端:发送消息的时候,往多个partition 中去发。
- 消费端:只设置一个consumer,消息来了不立即处理,而是自己逻辑排序后处理。搞一个类似CountDownLatch,比如总共有3条消息,就只有当3条消息都接受到了后,才进行处理,consumer端自己可以在业务逻辑中排序,比如给消息里加上一个type类型,检测是下单的消息,就优先处理,然后再处理减库存,通知快递。如果考虑到性能还不够高,可以在consumer端,考虑增加多线程去处理。
注意:
- consumer group中保证,consumer 的数量 < partition的数量,否则多出的consumer得不到信息。
- 如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的consumer 数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用。
消息积压和解决
1.生产者发送流量太大
- 降低消息生产的速度。生产者端产生消息的速度通常是跟业务息息相关的,一般情况下不太好直接优化。但是可以采用批量发送消息的方式,降低IO频率。
2. 消费者能力不足
- 修改消费端程序,写一个临时的分发数据的 consumer 程序,将收到的消息快速转发给指定的新创建的topic,给这个topic设置原来10倍的分区。临时征用 10 倍的机器来部署 consumer,每一个 consumer 消费一个新主题的分区。
- 将消息快速转录。保存到数据库或者Redis,然后再慢慢进行处理。
- kafka消费能力不足,则可以考虑增加Topic的Partition的个数,同时提升消费者组的消费者数量。
3. 数据格式变动或者消费者有bug
- 将消费不成功的消息转发到其他队列里,类似死信队列,后面再慢慢分析。kafka没有死信队列需要自己实现。
kafka 如何不消费重复数据?
重复数据的产生:
发送方:发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
消费方:如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
解决方式:
消费消息服务做幂等校验,比如:数据库/redis:设置一个唯一的id去标志这条消息,消费时去查询一下,如果已经消费过了,则不进行消费。
将
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交offset合适?
- 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
6 Zookeeper 在 Kafka 中的作用?
ZooKeeper 主要为 Kafka 提供元数据的管理的功能。
Broker 注册 :在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点/brokers/ids,每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
Topic 注册: 在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:
/brokers/topics/my-topic/Partitions/0
、/brokers/topics/my-topic/Partitions/1
负载均衡 :Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
之前,在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。
消费进度Offset的理解
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。
现在是放在了一个叫做__consumer_offset 的topic 里,key是 consumerGroupId+topic+分区号,value就是当前offset的值。默认是50个分区,由consumer自己管理。
每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。
这意味kafka中的consumer对集群的影响是非常小的,添加一个或者减少一个consumer,对于集群或者其他consumer 来说,都是没有影响的,因为每个consumer维护各自的消费offset。
一般情况下我们按照顺序逐条消费commit log中的消息,当然我可以通过指定offset来重复消费某些消息, 或者跳过某些消息。
延迟队列的实现
- 发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一 般不能支持任意时间段的延时)。
- consumer通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中。提交时需要带上offset,便于下次知道从哪里开始消费。
- 队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对 应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。
kafka 高性能的原因?
磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,
不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
读写数据的批量batch处理以及压缩传输 。Kafka采用的数据压缩的方式,以时间换空间,通过cpu时间的增加来尽量的减少磁盘空间的占用和网络IO的传输量,Kafka中消息的压缩是发生在生产者和Broker端的。
- 在生产者端,消息发送的时候将消息进行压缩,可以通过参数来配置进行压缩的算法
- 在消费者端就需要对消息进行解压缩操作
数据传输的零拷贝
传统的IO流程,某台机器将一份数据(比如一个文件)通过网络传输到另外一台机器,主要包括 read 和 write 过程。
- read:把数据从磁盘读取到内核缓冲区,再拷贝到用户缓冲区
- write:先把用户缓冲区的数据写入到 socket缓冲区,最后写入网卡设备
需要经过四次拷贝和四次状态的转换
用户空间的应用程序通过read()函数,向操作系统发起IO调用,上下文从用户态到切换到内核态(切换1),然后再通过 DMA 控制器将数据从【磁盘文件】拷贝到【操作系统内核缓冲区】
CPU将数据从【内核缓冲区】拷贝到【用户缓冲区】,上下文从内核态转为用户态(切换2),read函数返回。
用户应用进程通过write函数,发起IO调用,上下文从用户态转为内核态(切换3),CPU将数据从【用户缓冲区】拷贝到【内核中Socket缓冲区】
MA控制器把数据从【内核中Socket缓冲区】拷贝到【内核中网卡的缓冲区】上下文从内核态切换回用户态(切换4),write函数返回
Kafka 使用sendfile 来实现零拷贝
- 数据从从【磁盘文件】拷贝到【操作系统内核缓冲区】后,直接让操作系统内核缓冲区的数据发送到网卡,跳过了两次CPU拷贝数据的步骤。减少了内核态和用户态上下文状态的切换
注:其实零拷贝,并不是说拷贝的次数为零,只是说没有cup拷贝的过程,这里的零拷贝指的是cpu拷贝次数为零,
什么是用户态、内核态?
- 如果进程运行于内核空间,被称为进程的内核态
- 如果进程运行于用户空间,被称为进程的用户态。
操作系统为每个进程都分配了内存空间,一部分是用户空间,一部分是内核空间。内核空间是操作系统内核访问的区域,是受保护的内存空间,而用户空间是用户应用程序访问的内存区域
设计一个MQ的思路
- 可伸缩性:需要的时候快速扩容,就可以增加吞吐量和容量。参照一下 kafka 的设计理念, broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。 如果现在资源不够了,给 topic 增加 partition。
- 持久化:mq 的数据要落地磁盘,参考kafka顺序写,顺序读。这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的
- 高可用性:多副本 -> leader & follower,broker 挂了重新选举 leader
- 数据 0 丢失:参考:RabbitMQ的消息零丢失方案 - 楼上有只喵 (pyr9.github.io), kafka的消息零丢失方案 - 楼上有只喵 (pyr9.github.io)