KAFKA消息机制!

重复消费

consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。

例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。

下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。

消费者消费时间过长:

max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟。

表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息。

那么 Consumer 会主动发起 离开组 的请求,Coordinator 也会开启新一轮 Rebalance。

因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。

提高消费能力,提高单条消息的处理速度:根据实际场景max.poll.interval.ms值设置大一点,避免不必要的rebalance。

可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。

消息丢失

消费者程序丢失数据

Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移

假如某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了。

因此这条消息对于 Consumer 而言实际上是丢失了。

最佳配置:

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)

设置 acks = all:

  • 设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是 已提交。

设置 retries 为一个较大的值。

  • 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

设置 unclean.leader.election.enable = false

设置 replication.factor >= 3

  • 防止消息丢失的主要机制就是冗余。

设置 min.insync.replicas > 1

  • 控制的是消息至少要被写入到多少个副本才算是 已提交 。
  • 设置成大于 1 可以提升消息持久性。
  • 在实际环境中千万不要使用默认值 1。

确保 replication.factor > min.insync.replicas

  • 如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。

确保消息消费完成再提交。

  • Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。

消息顺序

乱序场景一

因为一个Topic可以有多个Partition,Kafka只能保证Partition内部有序。

  • 可以设置Topic 有且只有一个Partition

  • 根据业务需要,需要顺序的指定为同一个Partition

乱序场景二

对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序。

  • 消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据
    • 根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作。

消息堆积

消息堆积原因

生产者短时间内生产大量消息到Topic,消费者无法及时消费。

消费者的消费能力不足(消费者并发低、消息处理时间长),导致消费效率低于生产效率。

消费者异常(如消费者故障、消费者网络异常等)导致无法消费消息。

Topic分区设置不合理,或新增分区无消费者消费。

Topic频繁重平衡导致消费效率降低。

处理方法

消费者端

  • 根据实际业务需求,合理增加消费者个数(消费并发度),确保分区数/消费者数=整数
    • 建议消费者数和分区数保持一致。
  • 提高消费者的消费速度
    • 通过优化消费者处理逻辑(减少复杂计算、第三方接口调用和读库操作),减少消费时间。
  • 增加消费者每次拉取消息的数量:
    • 拉取数据/处理时间 >= 生产速度。

生产者端

  • 生产消息时,给消息Key加随机后缀,使消息均衡分布到不同分区上。

  • 在实际业务场景中,为消息Key加随机后缀,会导致消息全局不保序

    • 需根据实际业务判断是否适合给消息Key加随机后缀。

服务端

  • 合理设置Topic的分区数,在不影响业务处理效率的情况下
    • 调大Topic的分区数量。
  • 当服务端出现消息堆积时,对生产者进行熔断,或将生产者的消息转发到其他Topic。