Kafka消费消息丢失排查,原因竟是groupId重复

Kafka消费消息丢失排查,原因竟是groupId重复

现象

BI的同事发现某指标数据展示有问题,发现最近入库的数据缺失,然后反馈到DBA. 经DBA排查后发现原始数据缺少.

排查

  1. 之前笔者在休假,同事初步排查怀疑是消息阻塞导致.经过代码调整发版之后发现还是有情况发生.
  2. 笔者接手之后,在本地打印指定点位的消息,发现没有丢失消息的情况.(15分钟一条消息)
    于是在线上系统中添加了打印指定点位的日志.(发版下班)
  3. 第二天,查看日志发现有缺失情况,本地打印继续开启 发现没有复现
    于是查询消费组,收集到一组 host(ip)
@Test
    public void showGroupInfo() throws ExecutionException, InterruptedException {
        String id = "kafka消费组id";
        DescribeConsumerGroupsResult describeConsumerGroupsResult = admin.describeConsumerGroups(Collections.singleton(id));
        final KafkaFuture<Map<String, ConsumerGroupDescription>> all = describeConsumerGroupsResult.all();
        final Map<String, ConsumerGroupDescription> stringConsumerGroupDescriptionMap = all.get();
        final Set<Map.Entry<String, ConsumerGroupDescription>> entries1 = stringConsumerGroupDescriptionMap.entrySet();
        for (Map.Entry<String, ConsumerGroupDescription> stringConsumerGroupDescriptionEntry : entries1) {
            final ConsumerGroupDescription value = stringConsumerGroupDescriptionEntry.getValue();
            final Collection<MemberDescription> members = value.members();
            for (MemberDescription member : members) {
                String s1 = member.consumerId();
                String host = member.host();
                String s = member.clientId();
                String s2 = member.assignment().toString();
                System.out.printf("clientId[%s],memberId[%s],host[%s],assignment[%s]%n", s, s1, host, s2);
            }
        }
    }
  1. 找运维同事查看ip之后发现了另一个项目,拉取jar反编译之后发现配置文件中kafka 消费者 groupId 配置相同
  2. 反馈相关项目负责人

疑问

如何统一管理/监控 kafka group 划分 避免此类问题发生?
kafka groupId 是否要按照微服务项目来划分?

原文地址:https://www.cnblogs.com/zzzz-yh/archive/2022/08/10/16571672.html