详解Kafka中的消费与心跳

Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。心跳超时会导致消息重复消费。

1、Kafka消费

首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示:

public class JConsumerSubscribe extends Thread {
   public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }    /** 初始化Kafka集群信息. */    private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers""dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址
       props.put("group.id""ke");// 指定消费者组
       props.put("enable.auto.commit""true");// 开启自动提交
       props.put("auto.commit.interval.ms""1000");// 自动提交的时间间隔
       // 反序列化消息主键        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
       // 反序列化消费记录        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
       return props;
   }    /** 实现一个单线程消费者. */    @Override    public void run() {        // 创建一个消费者实例对象        KafkaConsumer
  
    consumer = new KafkaConsumer(configure());        // 订阅消费主题集合        consumer.subscribe(Arrays.asList(
   "test_kafka_topic"));        // 实时消费标识        boolean flag = 
   true;        
   while (flag) {            // 获取主题消息数据            ConsumerRecords
   
     records = consumer.poll(Duration.ofMillis(100));            
    for (ConsumerRecord
    
      record : records)                // 循环打印消息记录                System.out.printf(
     "offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());        }        // 出现异常关闭消费者对象        consumer.close();    }} 
    
   
  

上述代码我们就可以非常便捷的拿到Topic中的数据。但是,当我们调用poll方法拉取数据的时候,Kafka Broker Server做了那些事情。接下来,我们可以去看看源代码的实现细节。核心代码如下: org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords
  
    poll(final long timeoutMs, final boolean includeMetadataInTimeout) {        acquireAndEnsureOpen();        try {            
   if (timeoutMs "Timeout must not be negative");            
   if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {                throw new IllegalStateException(
   "Consumer is not subscribed to any topics or assigned any partitions");            }            // poll 
   for new data until the timeout expires            long elapsedTime = 0L;            
   do {                client.maybeTriggerWakeup();                final long metadataEnd;                
   if (includeMetadataInTimeout) {                    final long metadataStart = time.milliseconds();                    
   if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {                        
   return ConsumerRecords.empty();                    }                    metadataEnd = time.milliseconds();                    elapsedTime += metadataEnd - metadataStart;                } 
   else {                    
   while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {                        log.warn(
   "Still waiting for metadata");                    }                    metadataEnd = time.milliseconds();                }                final Map
   
    >> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));                
    if (!records.isEmpty()) {                    // before returning the fetched records, we can send off the next round of fetches                    // and avoid block waiting 
    for their responses to 
    enable pipelining 
    while the user                    // is handling the fetched records.                    //                    // NOTE: since the consumed position has already been updated, we must not allow                    // wakeups or any other errors to be triggered prior to returning the fetched records.                    
    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {                        client.pollNoWakeup();                    }                    
    return this.interceptors.onConsume(new ConsumerRecords(records));                }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd - metadataEnd;            } 
    while (elapsedTime return ConsumerRecords.empty();        } finally {            release();        }    } 
   
  

上述代码中有个方法pollForFetches,它的实现逻辑如下:

private Map
  
   >> pollForFetches(final long timeoutMs) {        final long startMs = time.milliseconds();        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);        // 
   if data is available already, 
   return it immediately        final Map
   
    >> records = fetcher.fetchedRecords();        
    if (!records.isEmpty()) {            
    return records;        }        // send any new fetches (won
    't resend pending fetches)        fetcher.sendFetches();        // We do not want to be stuck blocking in poll if we are missing some positions        // since the offset lookup may be backing off after a failure        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call        // updateAssignmentMetadataIfNeeded before this method.        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {            pollTimeout = retryBackoffMs;        }        client.poll(pollTimeout, startMs, () -> {            // since a fetch might be completed by the background thread, we need this poll condition            // to ensure that we do not block unnecessarily in poll()            return !fetcher.hasCompletedFetches();        });        // after the long poll, we should check whether the group needs to rebalance        // prior to returning data so that the group can stabilize faster        if (coordinator.rejoinNeededOrPending()) {            return Collections.emptyMap();        }        return fetcher.fetchedRecords();    } 

上述代码中加粗的位置,我们可以看出每次消费者客户端拉取数据时,通过poll方法,先调用fetcher中的fetchedRecords函数,如果获取不到数据,就会发起一个新的sendFetches请求。而在消费数据的时候,每个批次从Kafka Broker Server中拉取数据是有最大数据量限制,默认是500条,由属性(max.poll.records)控制,可以在客户端中设置该属性值来调整我们消费时每次拉取数据的量。

提示:这里需要注意的是,max.poll.records返回的是一个poll请求的数据总和,与多少个分区无关。因此,每次消费从所有分区中拉取Topic的数据的总条数不会超过max.poll.records所设置的值。

而在Fetcher的类中,在sendFetches方法中有限制拉取数据容量的限制,由属性(max.partition.fetch.bytes),默认1MB。可能会有这样一个场景,当满足max.partition.fetch.bytes限制条件,如果需要Fetch出10000条记录,每次默认500条,那么我们需要执行20次才能将这一次通过网络发起的请求全部Fetch完毕。

这里,可能有同学有疑问,我们不能将默认的max.poll.records属性值调到10000吗?可以调,但是还有个属性需要一起配合才可以,这个就是每次poll的超时时间(Duration.ofMillis(100)),这里需要根据你的实际每条数据的容量大小来确定设置超时时间,如果你将最大值调到10000,当你每条记录的容量很大时,超时时间还是100ms,那么可能拉取的数据少于10000条。

而这里,还有另外一个需要注意的事情,就是会话超时的问题。session.timeout.ms默认是10s,group.min.session.timeout.ms默认是6s,group.max.session.timeout.ms默认是30min。当你在处理消费的业务逻辑的时候,如果在10s内没有处理完,那么消费者客户端就会与Kafka Broker Server断开,消费掉的数据,产生的offset就没法提交给Kafka,因为Kafka Broker Server此时认为该消费者程序已经断开,而即使你设置了自动提交属性,或者设置auto.offset.reset属性,你消费的时候还是会出现重复消费的情况,这就是因为session.timeout.ms超时的原因导致的。

2、心跳机制

上面在末尾的时候,说到会话超时的情况导致消息重复消费,为什么会有超时?有同学会有这样的疑问,我的消费者线程明明是启动的,也没有退出,为啥消费不到Kafka的消息呢?消费者组也查不到我的ConsumerGroupID呢?这就有可能是超时导致的,而Kafka是通过心跳机制来控制超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程来定时发送心跳和检测消费者的状态。每个消费者都有个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,声明的Schema如下所示:

private final int sessionTimeoutMs;
   private final int heartbeatIntervalMs;
   private final int maxPollIntervalMs;
   private final long retryBackoffMs;
   private volatile long lastHeartbeatSend;  
   private long lastHeartbeatReceive;
   private long lastSessionReset;
   private long lastPoll;
   private boolean heartbeatFailed;

心跳线程中的run方法实现代码如下:

public void run() {
           try {
               log.debug("Heartbeat thread started");
               while (true) {
                   synchronized (AbstractCoordinator.this) {
                       if (closed)
                           return;
                       if (!enabled) {
                           AbstractCoordinator.this.wait();
                           continue;
                       }                        if (state != MemberState.STABLE) {
                           // the group is not stable (perhaps because we left the group or because the coordinator
                           // kicked us out), so disable heartbeats and wait for the main thread to rejoin.
                           disable();
                           continue;
                       }
                       client.pollNoWakeup();
                       long now = time.milliseconds();
                       if (coordinatorUnknown()) {
                           if (findCoordinatorFuture != null || lookupCoordinator().failed())
                               // the immediate future check ensures that we backoff properly in the case that no
                               // brokers are available to connect to.
                               AbstractCoordinator.this.wait(retryBackoffMs);
                       } else if (heartbeat.sessionTimeoutExpired(now)) {
                           // the session timeout has expired without seeing a successful heartbeat, so we should
                           // probably make sure the coordinator is still healthy.
                           markCoordinatorUnknown();
                       } else if (heartbeat.pollTimeoutExpired(now)) {
                           // the poll timeout has expired, which means that the foreground thread has stalled
                           // in between calls to poll(), so we explicitly leave the group.
                           maybeLeaveGroup();
                       } else if (!heartbeat.shouldHeartbeat(now)) {
                           // poll again after waiting for the retry backoff in case the heartbeat failed or the
                           // coordinator disconnected
                           AbstractCoordinator.this.wait(retryBackoffMs);
                       } else {
                           heartbeat.sentHeartbeat(now);
                           sendHeartbeatRequest().addListener(new RequestFutureListener() {
                               @Override
                               public void onSuccess(Void value) {
                                   synchronized (AbstractCoordinator.this) {
                                       heartbeat.receiveHeartbeat(time.milliseconds());
                                   }
                               }
                               @Override
                               public void onFailure(RuntimeException e) {
                                   synchronized (AbstractCoordinator.this) {
                                       if (e instanceof RebalanceInProgressException) {
                                           // it is valid to continue heartbeating while the group is rebalancing. This
                                           // ensures that the coordinator keeps the member in the group for as long
                                           // as the duration of the rebalance timeout. If we stop sending heartbeats,
                                           // however, then the session timeout may expire before we can rejoin.
                                           heartbeat.receiveHeartbeat(time.milliseconds());
                                       } else {
                                           heartbeat.failHeartbeat();
                                           // wake up the thread if it's sleeping to reschedule the heartbeat                                            AbstractCoordinator.this.notify();                                        }                                    }                                }                            });                        }                    }                }            } catch (AuthenticationException e) {                log.error("An authentication error occurred in the heartbeat thread", e);                this.failed.set(e);            } catch (GroupAuthorizationException e) {                log.error("A group authorization error occurred in the heartbeat thread", e);                this.failed.set(e);            } catch (InterruptedException | InterruptException e) {                Thread.interrupted();                log.error("Unexpected interrupt received in heartbeat thread", e);                this.failed.set(new RuntimeException(e));            } catch (Throwable e) {                log.error("Heartbeat thread failed due to unexpected error", e);                if (e instanceof RuntimeException)                    this.failed.set((RuntimeException) e);                else                    this.failed.set(new RuntimeException(e));            } finally {                log.debug("Heartbeat thread has closed");            }        } 

在心跳线程中这里面包含两个最重要的超时函数,它们是sessionTimeoutExpired和pollTimeoutExpired。

public boolean sessionTimeoutExpired(long now) {
       return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;
}public boolean pollTimeoutExpired(long now) {
       return now - lastPoll > maxPollIntervalMs;
}

2.1、sessionTimeoutExpired

如果是sessionTimeout超时,则会被标记为当前协调器处理断开,此时,会将消费者移除,重新分配分区和消费者的对应关系。在Kafka Broker Server中,Consumer Group定义了5中(如果算上Unknown,应该是6种状态)状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:

2.2、pollTimeoutExpired

如果触发了poll超时,此时消费者客户端会退出ConsumerGroup,当再次poll的时候,会重新加入到ConsumerGroup,触发RebalanceGroup。而KafkaConsumer Client是不会帮我们重复poll的,需要我们自己在实现的消费逻辑中不停的调用poll方法。

3.分区与消费线程

关于消费分区与消费线程的对应关系,理论上消费线程数应该小于等于分区数。之前是有这样一种观点,一个消费线程对应一个分区,当消费线程等于分区数是最大化线程的利用率。直接使用KafkaConsumer Client实例,这样使用确实没有什么问题。但是,如果我们有富裕的CPU,其实还可以使用大于分区数的线程,来提升消费能力,这就需要我们对KafkaConsumer Client实例进行改造,实现消费策略预计算,利用额外的CPU开启更多的线程,来实现消费任务分片。

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/205951.html<

(0)
运维的头像运维
上一篇2025-04-07 20:44
下一篇 2025-04-07 20:45

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注