KAFKA消息机制!
KAFKA消息机制!
月伴飞鱼重复消费
consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。
例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。
下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。
消费者消费时间过长:
max.poll.interval.ms
参数定义了两次poll的最大间隔,它的默认值是 5 分钟。表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息。
那么 Consumer 会主动发起 离开组 的请求,Coordinator 也会开启新一轮 Rebalance。
因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。
提高消费能力,提高单条消息的处理速度:根据实际场景
max.poll.interval.ms
值设置大一点,避免不必要的rebalance。可适当减小
max.poll.records
的值,默认值是500,可根据实际消息速率适当调小。
消息丢失
消费者程序丢失数据
Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移。
假如某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了。
因此这条消息对于 Consumer 而言实际上是丢失了。
最佳配置:
不要使用
producer.send(msg)
,而要使用producer.send(msg, callback)
。设置 acks = all:
- 设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是 已提交。
设置 retries 为一个较大的值。
- 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了
retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。设置
unclean.leader.election.enable = false
。设置
replication.factor >= 3
。
- 防止消息丢失的主要机制就是冗余。
设置
min.insync.replicas > 1
。
- 控制的是消息至少要被写入到多少个副本才算是 已提交 。
- 设置成大于 1 可以提升消息持久性。
- 在实际环境中千万不要使用默认值 1。
确保
replication.factor > min.insync.replicas
。
- 如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。
确保消息消费完成再提交。
- Consumer 端有个参数
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。
消息顺序
乱序场景一
因为一个
Topic
可以有多个Partition
,Kafka只能保证Partition
内部有序。
可以设置
Topic
有且只有一个Partition
。根据业务需要,需要顺序的指定为同一个
Partition
。
乱序场景二
对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序。
- 消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据
- 根据
key
或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作。
消息堆积
消息堆积原因
生产者短时间内生产大量消息到
Topic
,消费者无法及时消费。消费者的消费能力不足(消费者并发低、消息处理时间长),导致消费效率低于生产效率。
消费者异常(如消费者故障、消费者网络异常等)导致无法消费消息。
Topic
分区设置不合理,或新增分区无消费者消费。
Topic
频繁重平衡导致消费效率降低。
处理方法
消费者端
- 根据实际业务需求,合理增加消费者个数(消费并发度),确保分区数/消费者数=整数
- 建议消费者数和分区数保持一致。
- 提高消费者的消费速度
- 通过优化消费者处理逻辑(减少复杂计算、第三方接口调用和读库操作),减少消费时间。
- 增加消费者每次拉取消息的数量:
- 拉取数据/处理时间 >= 生产速度。
生产者端
生产消息时,给消息
Key
加随机后缀,使消息均衡分布到不同分区上。在实际业务场景中,为消息Key加随机后缀,会导致消息全局不保序
- 需根据实际业务判断是否适合给消息Key加随机后缀。
服务端
- 合理设置
Topic
的分区数,在不影响业务处理效率的情况下
- 调大Topic的分区数量。
- 当服务端出现消息堆积时,对生产者进行熔断,或将生产者的消息转发到其他Topic。