kafka入门

1 kafka 是什么?

Kafka 是一个分布式,支持分区(partition), 多副本(replication),基于zookeeper 的分布式消息系统。它最大的特性就是可以实时的处理大数据量。

Kafka的使用场景:

  • 日志的收集:记录各种服务的log。,通过kafka以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

2 kafka 的整体设计

从一个较高的层面上来看,producer通过网络发送消息到Kafka集群,然后consumer来进行消费,如下图

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。

image-20230228231340194

还有几个概念:

  • Producer: 消息生产者,向Broker发送消息的客户端 。

  • Broker: 消息中间件处理节点,一个Kafka实例就是一个broker,一个或者多个Broker可以组成一个Kafka集群。

  • Topic:Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic ,类似activeMq的queue

  • Partition: 一个topic可以分为多个partition,每个 partition内部消息是有序的.

  • ConsumerGroup:每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个

    Consumer Group中只能有一个Consumer能够消费该消息

  • Consumer: 消息消费者,从Broker读取消息的客户端

3 Kafka 的基本使用

  1. 启动zookeeper zookeeper特性与节点介绍 - 楼上有只喵 (pyr9.github.io)

  2. 下载并解压: Apache Kafka

    1
    cd kafka_2.12-3.2.1
  3. 修改配置文件config/server.properties,根据需要修改:

    1
    2
    3
    broker.id=0     //如果是单机安装则不用修改,如果是集群安装则要保证每个broker.id配置不同的值
    log.dirs=/Tools/kafka_2.13-2.4.1/logs //日志位置,该文件夹必须存在,否则启动时会报错
    zookeeper.connect=localhost:2181 //zookeeper的连接地址,多个地址用逗号分隔
  4. 启动服务

    1
    bin/kafka-server-start.sh -daemon  config/server.properties
    • server.properties的配置路径是一个强制的参数
    • ­daemon表示以后台进程运行

    我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树

    1
    2
    [zk: localhost:2181(CONNECTED) 2] ls /
    [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

    查看kafka节点

    1
    2
    [zk: localhost:2181(CONNECTED) 3]  ls /brokers/ids
    [0]
  5. 停止服务

    1
    bin/kafka‐server‐stop.sh
  6. 创建主题

    1
    2
    ➜  kafka_2.12-3.2.1 bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    Created topic test.
  7. 查看kafka中目前存在的topic

    1
    2
    ➜  kafka_2.12-3.2.1  bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    test
  8. 发送消息

    1
    2
    3
    ➜  kafka_2.12-3.2.1 bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
    >my first event
    >my second event
  9. 消费消息

    1
    2
    3
    4
    ➜  kafka_2.12-3.2.1 bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

    my first event
    my second event

    –from-beginning 可选,加上的话,就可以收到历史已经发送的数据。默认不加的话,是消费最新的消息

  10. 单播消费

    • 一条消息只能被某一个消费者消费的模式。类似queue模式。
    • 只需让所有消费者在同一个消费组里即可。

    分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息 。

    1
    ➜  kafka_2.12-3.2.1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=group1 --topic test
  11. 多播消费

    • 一条消息能被多个消费者消费的模式,类似publish-subscribe模式。
    • 针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。
    • 我们再增加一个消费者,该消费者属于group2消费组,结果两个客户端都能收到消息
    1
    ➜  kafka_2.12-3.2.1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=group2 --topic test
  12. 查看消费组名

    1
    2
    3
    4
    ➜  kafka_2.12-3.2.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    testGroup1
    group2
    group1
  13. 查看消费组的消费偏移量

    1
    ➜  kafka_2.12-3.2.1 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group1

    image-20230228231400113

    • current-offset:当前消费组的已消费偏移量
    • log-end-offset:主题对应分区消息的结束偏移量(HW)
    • lag:当前消费组未消费的消息数

4 Topic 和 partition 详解

  • topic 是一个类别的名称,同类消息,发送到一个topic下,对于每一个topic可以有多个分区(partition)。

    image-20230228231408951

  • partition 是一个有序的message序列,这些message将按顺序添加到commitlog文件中。一个partition 对应一个commit log 文件,对应存放在配置的log文件中。

    1
    2
    3
    4
    5
    6
    7
    ➜  kafka_2.12-3.2.1 cd /tmp/kafka-logs
    ➜ kafka-logs cd test-0
    ➜ test-0 ls
    00000000000000000000.index 00000000000000000000.timeindex partition.metadata
    00000000000000000000.log leader-epoch-checkpoint
    ➜ test-0 cat 00000000000000000000.log
    F_:�����6����6�(my first eventG�����������*my second event>I=;���R����R�
  • 每个partion 中的消息都有一个唯一的编号,叫做offset,用来标识这条消息。

  • 创建多个分区的主题

    1
    2
    ➜  kafka_2.12-3.2.1 bin/kafka-topics.sh --create --topic test2 --bootstrap-server localhost:9092 ‐‐replication‐factor 1 ‐‐partitions 2
    Created topic test2.

5 Kafka副本集

  • Kafka副本集是指将日志复制多份,我们知道Kafka的数据是存储在日志文件中的,这就相当于数据的备份、冗余
  • Kafka可以为每个Topic设置副本集,Kafka中的Topic只是个逻辑概念,实际存储数据的是Partition,所以真正被复制的也是Partition,副本集是相对于Partition来说的
  • 一个Topic的副本集可以分布在多个Broker中,当一个Broker挂掉了,其他的Broker上还有数据,这就提高了数据的可靠性,这也是副本集的主要作用。

image-20230228231420153

  • 查看下topic的情况
1
➜  kafka_2.12-3.2.1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test2

image-20230228231430788

  • leader节点负责给定partition的所有读写请求。
  • replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
  • isr**(InSyncRepli)** :leader副本保持一定同步程度的副本(包括leader)组成ISR。是replicas的一个子集。

6 Kafka节点故障原因及处理方式

Kafka节点(Broker)故障的两种情况:

  • Kafka节点与Zookeeper心跳未保持视为节点故障
  • 当follower的消息落后于leader太多也会视为节点故障

Kafka对节点故障的处理方式:

  • Kafka会对故障节点进行移除,所以基本不会因为节点故障而丢失数据
  • Kafka的语义担保也很大程度上避免了数据丢失
  • Kafka会对消息进行集群内平衡,减少消息在某些节点热度过高

kafka入门
http://example.com/kafka入门/
作者
Panyurou
发布于
2022年8月13日
许可协议