消费模式

常见的消费模式有两种:

poll(拉):消费者主动向服务端拉取消息。

push(推):服务端主动推送消息给消费者。

由于推模式很难考虑到每个客户端不同的消费速率,导致消费者无法消费消息而宕机,因此kafka采用的是poll的模式,该模式有个缺点,如果服务端没有消息,消费端就会一直空轮询。为了避免过多不必要的空轮询,kafka做了改进,如果没消息服务端就会暂时保持该请求,在一段时间内有消息再回应给客户端。

消费工作流程

消费者总体工作流程

​ 消费者对消息进行消费,并且将已经消费的消息加入 _consumer_offsets 中。

image-20220903110040620

消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

image-20220903103656937

​ 对于消息中间件而言,一般有两种消息投递模式:点对点(P2P, Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic) , 主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题, 而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行 接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka同时支待两种消息投递模式,而这正是得益于消费者与消费组模型的契合:

  • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

消费者组选举Leader

具体的消费者组初始化流程:

​ 通过对GroupId进行Hash得到那台服务器的coordinator ,coordinator负责选出消费组中的Leader ,并且协调信息。真正存储消费记录的是 _consumer_offsets_partition 。

image-20220903103841994

image-20220903112720568

image-20220903112925137

image-20220903112936658

消费者API

消费组单消费者以及消费者组多消费者

image-20220903113854959

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。

public class CustomConsumer {
    public static void main(String[] args) {
        //0.配置信息
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.106.86.64:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 
        //1.创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        ArrayList<String> topic = new ArrayList<>();
        topic.add("first");
        kafkaConsumer.subscribe(topic);
 
 
        //2.消费信息
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            records.forEach(record -> {
                System.out.println(record);
            });
        }
        //3.关闭
    }
}

分区平衡以及再平衡

image-20220903114711746

参数名称 描述
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。

session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。

partition.assignment.strategy 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky (协作者粘性)

分区分配策略

​ 我们知道一个 Consumer Group 中有多个 Consumer,一个 Topic 也有多个 Partition,所以必然会涉及到 Partition 的分配问题: 确定哪个 Partition 由哪个 Consumer 来消费的问题。

Kafka 客户端提供了3 种分区分配策略:RangeAssignorRoundRobinAssignor 和 StickyAssignor,前两种 分配方案相对简单一些StickyAssignor分配方案相对复杂一些。

Range

image-20220903104029564

Range 分区分配再平衡案例
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。 (被整体分配)
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

RoundRobin

image-20220903104000261

// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。(采用轮训)
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky

StickyAssignor 分区分配算法是 Kafka 客户端提供的分配策略中最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:

1)、Topic Partition 的分配要尽量均衡。

2)、当 Rebalance(重分配,后面会详细分析) 发生时,尽量与上一次分配结果保持一致。

该算法的精髓在于,重分配后,还能尽量与上一次结果保持一致,进而达到消费者故障下线,故障恢复后的均衡问题,在此就不举例了。

// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

offset位移提交

offset 的默认维护位置

​ Kafka 0.9 版本之前consumer默认将offset保存在Zookeeper中,从0.9版本之后consumer默认保存在Kafka一个内置的topic中,该topic为_consumer_offsets。

​ 消费者提交的offset值维护在__consumer_offsets这个Topic中,具体维护在哪个分区中,是由消费者所在的消费者组groupid决定,计算方式是:groupid的hashCode值对50取余。当kafka环境正常而消费者不能消费时,有可能是对应的__consumer_offsets分区leader为none或-1,或者分区中的日志文件损坏导致。

​ __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。

​ 一般情况下, 当集群中第一次有消费者消费消息时会自动创建主题_ consumer_ offsets, 不过它的副本因子还受offsets.topic .replication.factor参数的约束,这个参数的默认值为3 (下载安装的包中此值可能为1),分区数可以通过offsets.topic.num.partitions参数设置,默认为50。

​ 在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 47.106.86.64:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[offset,atguigu,1]::OffsetAndMetadata(offset=7, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)
[offset,atguigu,0]::OffsetAndMetadata(offset=8, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)

消费者提交offset的方式有两种,自动提交手动提交

自动提交

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

image-20220903122129755

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

自动提交有可能出现消息消费失败,但是却提交了offset的情况,导致消息丢失。为了能够实现消息消费offset的精确控制,更推荐手动提交。

// 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 阻塞线程,一直到提交到成功,会进行失败重试
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。没有失败重试机制,会提交失败

image-20220903122747576

指定消费位置

在kafka中当消费者查找不到所记录的消费位移时,会根据auto.offset.reset的配置,决定从何处消费。

auto.offset.reset = earliest | latest | none 默认是 latest。

  • earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
  • latest(默认值):自动将偏移量重置为最新偏移量
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

image-20220903123232954

​ Kafka中的消费位移是存储在一个内部主题中的, 而我们可以使用**seek()**方法可以突破这一限制:消费位移可以保存在任意的存储介质中, 例如数据库、 文件系统等。以数据库为例, 我们将消费位移保存在其中的一个表中, 在下次消费的时候可以读取存储在数据表中的消费位移并通过seek()方法指向这个具体的位置 。

//配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

指定位移消费

// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
 
//  保证分区分配方案已经制定完毕
while (assignment.size() == 0){
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
}
 
// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
    kafkaConsumer.seek(topicPartition,600);
}
 
// 3  消费数据
while (true){
 
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
 
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
        System.out.println(consumerRecord);
    }
}

指定时间消费

​ 原理就是查到时间对应的offset再去指定位移消费,为了确保同步到分区信息,我们还需要确保能获取到分区,再去查询分区时间

// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
 
//  保证分区分配方案已经制定完毕
while (assignment.size() == 0){
    kafkaConsumer.poll(Duration.ofSeconds(1));
 
    assignment = kafkaConsumer.assignment();
}
 
// 希望把时间转换为对应的offset
HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
 
// 封装对应集合
for (TopicPartition topicPartition : assignment) {
    topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
 
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
 
// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
 
    OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
 
    kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}
 
// 3  消费数据
while (true){
 
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
 
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
 
        System.out.println(consumerRecord);
    }
}

漏消费和重复消费

重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

image-20220903123840273

消费者事务

image-20220903123934358

数据积压(提高吞吐量)

image-20220903124022229

参数名称 描述
fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)ormax.message.bytes (topic config)影响。

max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

拦截器

与生产者对应,消费者也有拦截器。我们来看看拦截器具体的方法。

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
 
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
 
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
 
    void close();
}

​ Kafka Consumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容按照某种规则过滤消息(可能会减少poll()方法返回 的消息的个数)。如果onConsume()方法中抛出异常, 那么会被捕获并记录到日志中, 但是异常不会再向上传递。

​ Kafka Consumer会在提交完消费位移之后调用拦截器的**onCommit()**方法, 可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节, 而使用拦截器的onCommit()方法却可以做到这 一点。