消费者
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡
消费者通过向被指派为群组协调器的 broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系
Properties props = new Properties();
//kafka 集群,broker-list
props.put("bootstrap.servers", "172.24.211.140:9092");
props.put("group.id", "consumer1");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
consumer.subscribe(List.of("test"));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.err.println(record);
}
}
消费方式:
采用 pull(拉)模式从 broker 中读取数据
相较于服务器 push 模式,pull模式使消费者可以根据自己的情况来决定消费的速率,另一方面,pull模式每次也可以批量拉取数据,提高服务器吞吐量,而pull模式一旦有消息产生,就会立马推送,这意味着网络开销会比pull模式高一些。
但pull模式也有一些缺陷,如果没有消息的话,客户端就会在那里空转等待,所以Kafka为消费者的poll方法指定了一个时间参数,避免空转浪费CPU资源
配置
- fetch.min.bytes
- 指定了消费者从服务器获取记录的最小字节数
- fetch.max.wait.ms
- 指定 broker 的等待时间
- max.partition.fetch.bytes
- 指定了服务器从每个分区里返回给消费者的最大字节数
- session.timeout.ms
- 指定了消费者在被认为死亡之前可以与服务器断开连接的时间
- auto.offset.reset
- 指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理
- earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none 当该topic下所有分区中存在未提交的offset时,抛出异常
- enable.auto.commit
- 指定了消费者是否自动提交偏移量
- partition.assignment.strategy 决定哪些分区应该被分配给哪个消费者
- range:该策略会把主题的若干个连续的分区分配给消费者
- 轮询:该策略把主题的所有分区逐个分配给消费者
- client.id
- max.poll.records 控制单次调用 call() 方法能够返回的记录数量
- receive.buffer.bytes 和 send.buffer.bytes
- 读写数据时用到的 TCP 缓冲区也可以设置大小
偏移量
每一个消费者组都会对有进行消费的主题分区的偏移量进行记录,这也就意味着消费者可以手动设置这个偏移量从头消费数据。
更新分区当前偏移量的操作叫作提交
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic为__consumer_offsets。
自动提交:
- 如果 enable.auto.commit 被设为 true ,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去
手动提交:
把 auto.commit.offset 设为 false ,使用 commitSync()
异步提交 commitAsync() , 但该方法在发生错误时不会进行重试
再均衡监听:
订阅的时候传入 ConsumerRebalanceListener 实现相关接口
从特定偏移量开始处理:seekToBeginning(..)
读取特定偏移量:seek(..)
Kafka 消费异常没有手动提交导致的问题
假设本次偏移量消费到28 抛出异常 没有手动提交偏移量
该消费者下次迭代也会从29开始消费 为解决这个问题 所以只能新起消费者来替代此消费者
退出
其他线程调用consumer.wakeup()
会使consumer在poll抛出异常 然后进行close即可
没有群组的消费者
调用assign为其设置消费的分区