html tool

2023年3月20日星期一

转:kafka的offset区别

 https://blog.csdn.net/qianshangding0708/article/details/121506196

Kafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息。

Offset记录着下一条将要发送给Consumer的消息的序号。

Offset从语义上来看拥有两种:Current Offset 和 Committed Offset。

Current Offset

Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll消息时,都能够收到不重复的消息。

[popexizhi:consumer 客户端 开始工作后,自己记录的当前完成位置 !(Consumer 自己的草稿本自己每次工作中用)]

Committed Offset

Committed Offset保存在Broker上,它表示Consumer已经确认消费过的消息的序号。主要通过 commitSync 和 commitA‍sync
API来操作。举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用consumer.commitAsync()consumer.commitSync()来提交Committed Offset,那么此时Committed Offset依旧是0。

[popexizhi: broker上记录的consumer自己确认给合作着的自己完成的位置!(committed offset 是比较正式的工作节点记录,其他合作者都要使用,有历史性,每次consumer 的首次开始位置) ]

Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的过程中,一个partition被分配给了一个Consumer,那么这个Consumer该从什么位置开始消费消息呢?答案就是Committed Offset。另外,如果一个Consumer消费了5条消息(poll并且成功commitSync)之后宕机了,重新启动之后它仍然能够从第6条消息开始消费,因为Committed Offset已经被Kafka记录为5。

总结一下,Current Offset是针对Consumer的poll过程的,它可以保证每次poll都返回不重复的消息;而Committed Offset是用于Consumer Rebalance过程的,它能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。



Group Coordinator

Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及Consumer Rebalance。

[popexizhi: Group Coordinator 是topic 与 Consumer 的真正沟通人,为topic 们介绍和管理着 Consumer Group ]

对于每一个Consumer Group,Group Coordinator都会存储以下信息:

  • 订阅的topics列表

  • Consumer Group配置信息,包括session timeout等

  • 组中每个Consumer的元数据。包括主机名,consumer id

  • 每个Group正在消费的topic partition的当前offsets

  • Partition的ownership元数据,包括consumer消费的partitions映射关系

Consumer Group如何确定自己的coordinator是谁呢?简单来说分为两步:

[popexizhi: Consumer Group 如何找自己的coordinator ]

1. 确定Consumer Group offset信息将要写入__consumers_offsets topic的哪个分区。具体计算公式:

__consumers_offsets partition# = Math.abs(groupId.hashCode() % offsets.topic.num.partitions)  //offsets.topic.num.partitions默认值为50。

2. 该分区leader所在的broker就是被选定的coordinator

没有评论:

发表评论