auto offset reset 参数配置定义了消费者在没有初始偏移量的情况下从topic分区消费的具体方法。 当消费者组新加入一个消费者时,并且当前消费者是首次监听此topic体已经被定义并且是第一次收听一个主题时, 这个配置会告诉组内的消费者是从分区的开头还是结尾读取数据。

消息消费

每个 Kafka 消费者都属于一个消费者组,由消费者的 group.id 配置分组。 一个消费者组将包含一个或多个消费者。 消费者组中的消费者将被分配到主题分区以消费他们的消息。 每个分区将只有一个消费者分配给它,尽管一个消费者可以分配给任何一个主题中的多个分区,并且类似地分配给它订阅的所有主题中的分区。

首次创建新的消费者组并将其消费者分配给主题分区时,必须决定从哪个点开始轮询消息。 除非消费者被告知从特定偏移量进行轮询(非典型情况),否则有两个主要选项。 首先,消费者可能会从分区的开头读取消息,处理分区日志中存在的每条消息。 第二种选择是仅在消费者开始监听后才消费写入该主题的新消息。

配置

当消费者组没有设置初始偏移量时,是从topic的分区的开头消费还是消费新消息是由 Kafka 消费者的 auto.offset.reset 配置参数控制。 下表显示了具体配置。

一旦消费者组写入了偏移量,则此配置参数将不再适用。 如果消费者组中的消费者被停止然后重新启动,他们将从最后一个偏移量开始消费。

从最早的地方开始消费

将新消费者配置为 auto.offset.reset: earliest 将导致从分配给它的主题分区开始的所有事件都被消费。 在以下示例中,主题分区有两条消息“foo”和“bar”,这些消息将被使用:

如果topic分区包含数百万条消息,请确认数据量并且知悉处理这些消息是否会使系统不堪重负。 这些消息可能追溯到数周或数月之前,也可能追溯到系统的开始,具体取决于Topic的消息保留期限。 retention.ms 设置为 -1 表示不会丢弃任何旧消息,因此将消费所有消息。

从最新的地方开始消费

将新消费者配置为 auto.offset.reset: latest 仅当新消息写入消费者分配的分区上时才会进行消费。 在上面的场景中,只会使用来自偏移量 (3) 的新消息。 将跳过现有消息“foo”和bar”。 所以消费者是否应该配置为跳过现有消息取决于自身的业务要求。

数据丢失

有一种极端情况可能会导致数据丢失,即在可重试的异常情况下不会重新传送消息。 此场景适用于尚未记录任何当前偏移量(或偏移量已被删除)的新消费者组。

  • 两个消费者实例 A 和 B 加入一个新的消费者组。
  • 消费者实例配置为 auto.offset.reset 作为最新(即仅限新消息)。
  • 消费者 A 使用主题分区中的新消息。
  • 消费者 A 在消息处理完成之前死亡。 消费者偏移量不会更新以将消息标记为已消费。
  • 消费者组重新平衡,消费者 B 被分配到主题分区。
  • 由于没有有效的偏移量,并且 auto.offset.reset 设置为最新,因此不会使用消息。

由于消费者 A 已阅读消息,因此期望在失败情况下将消息重新传递给下一个消费者以分配给主题分区。 然而,在这种情况下,这不会发生,并且消息实际上丢失了。

偏移量检查

每个消费者组都为每个主题分区存储其偏移量。 这些存储在 Kafka 内部主题 __consumer_offsets 中。 Apache Kafka 在其安装中提供了许多管理脚本,可用于查询代理状态和主题等。 为了更好地了解数据丢失情况下发生的情况,可以使用 kafka-consumer-groups 脚本来查询活动消费者组的偏移量状态。

假设一个名为 demo-consumer-group 的消费者组和主题 demo-topic 具有单个分区。 该分区已经写入了两条消息(“foo”和“bar”)。

脚本如下:

kafka /bin/sh /usr/bin/kafka-consumer-groups — bootstrap-server localhost:9092 — describe — group demo-consumer-group

结果:

这表明该分区有两条消息,因为 LOG-END-OFFSET 为 2。由于消费者组中的消费者已分配给该分区,但已将 auto.offset.reset 设置为最新,因此它不消费消息,并且 没有有效的偏移量集。 这反映在 CURRENT-OFFSET 值未设置上。 LAG 指的是消费者距离日志的尾部有多远。 在这种情况下,由于没有有效的偏移量,因此未设置 LAG。

在上面的数据丢失场景中,当处理第一条新消息时发生故障,LOG-END-OFFSET 移动到 3 并且 CURRENT-OFFSET 保持不变。 当消费者组重新平衡并且另一个消费者实例被分配给分区时,它因此也不会消费新消息。 它将等到下一条消息被写入。

一旦一条或多条消息被成功消费,消费者组就会有一个有效的 CURRENT-OFFSET,即使消费者已经停止收听消息。 当消费者实例在这种情况下重新启动时,它将始终从下一个偏移量开始,而不管 auto.offset.reset 是什么。 例如,这里的 CURRENT-OFFSET 是 1,LAG 显示它在日志尾部后面是 1。 因此,消费者将消费主题分区上的第二条消息。 CURRENT-OFFSET 将移动到 2,LAG 将移动到 0。

如果在消息被处理之前消费失败,并且新的消费者被分配到主题分区,那么由于存在有效的 CURRENT-OFFSET,消息将被重新消费,从而导致数据丢失。

测试

auto.offset.reset 设置为 latest 可能导致意外行为的一种情况是针对真实的 Kafka 实例进行集成测试时——可能在 docker 容器中启动。 如果测试启动应用程序并发送第一条消息,并期望接收应用程序生成的结果出站消息,就会发生这种情况。 但是,如果消费者组仍在执行其第一次重新平衡(可能需要数十秒),则在写入出站消息时消费者可能尚未准备好。 在这种情况下,将 auto.offset.reset 设置为 latest,消息将不会按预期使用。

结论

首次消费Topic分区的消费者可以配置为消费该主题的所有消息或仅消费新消息。至于哪种情况下应采用哪种设置取决于业务的要求。 如果设置为最早位置,则须考虑数据量和处理消息的压力以及对资源的影响。