图解 Kafka 源码之 NetworkClient 网络通信组件架构设计
作者:王江华 2023-03-15 08:17:27
云计算
Kafka 今天主要聊聊「真正进行网络 I/O 的 NetworkClient 的架构设计」深度剖析下消息是如何被发送出去的。
大家好,我是 华仔, 又跟大家见面了。
上篇主要带大家深度剖析了「发送网络 I/O 的 Sender 线程的架构设计」,消息先被暂存然后调用网络I/O组件进行发送,今天主要聊聊「真正进行网络 I/O 的 NetworkClient 的架构设计」深度剖析下消息是如何被发送出去的。
认真读完这篇文章,我相信你会对 Kafka NetworkClient 的源码有更加深刻的理解。
这篇文章干货很多,希望你可以耐心读完。
一、总的概述
继续通过「场景驱动」的方式,来看看消息是如何在客户端被累加和待发送的。
在上篇中,我们知道了消息被 Sender 子线程先暂存到 KafkaChannel 的 Send 字段中,然后调用 NetworkClient#client.poll() 进行真正发送出去,如下图所示「6-11步」。
NetworkClient 为「生产者」、「消费者」、「服务端」等上层业务提供了网络I/O的能力。在 NetworkClient 内部使用了前面介绍的 Kafka 对 NIO 的封装组件,同时做了一定的封装,最终实现了网络I/O能力。NetworkClient 不仅仅用于客户端与服务端的通信,也用于服务端之间的通信。
接下来我们就来看看,「NetworkClient 网络I/O组件的架构实现以及发送处理流程」,为了方便大家理解,所有的源码只保留骨干。
二、NetworkClient 架构设计
NetworkClient 类是 KafkaClient 接口的实现类,它内部的重要字段有「Selectable」、「InflightRequest」以及内部类 「MetadataUpdate」。
github 源码地址如下:
https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
1、关键字段
publicclassNetworkClientimplementsKafkaClient {
// 状态枚举值
privateenumState {
ACTIVE,
CLOSING,
CLOSED
}
/* the selector used to perform network i/o */
// 用于执行网络 I/O 的选择器
privatefinalSelectableselector;
// Metadata元信息的更新器, 它可以尝试更新元信息
privatefinalMetadataUpdatermetadataUpdater;
/* the state of each node's connection */
// 管理集群所有节点连接的状态
privatefinalClusterConnectionStatesconnectionStates;
/* the set of requests currently being sent or awaiting a response */
// 当前正在发送或等待响应的请求集合
privatefinalInFlightRequestsinFlightRequests;
/* the socket send buffer size in bytes */
// 套接字发送数据的缓冲区的大小(以字节为单位)
privatefinalintsocketSendBuffer;
/* the socket receive size buffer in bytes */
// 套接字接收数据的缓冲区的大小(以字节为单位)
privatefinalintsocketReceiveBuffer;
/* the client id used to identify this client in requests to the server */
// 表示客户端id,标识客户端身份
privatefinalStringclientId;
/* the current correlation id to use when sending requests to servers */
// 向服务器发送请求时使用的当前关联 ID
privateintcorrelation;
/* default timeout for individual requests to await acknowledgement from servers */
// 单个请求等待服务器确认的默认超时
privatefinalintdefaultRequestTimeoutMs;
/* time in ms to wait before retrying to create connection to a server */
// 重连的退避时间
privatefinallongreconnectBackoffMs;
/**
* True if we should send an ApiVersionRequest when first connecting to a broker.
* 是否需要与 Broker 端的版本协调,默认为 true
* 如果为 true 当第一次连接到一个 broker 时,应当发送一个 version 的请求,用来得知 broker 的版本, 如果为 false 则不需要发送 version 的请求。
*/
privatefinalbooleandiscoverBrokerVersions;
// broker 端版本
privatefinalApiVersionsapiVersions;
// 存储着要发送的版本请求,key 为 nodeId,value 为构建请求的 Builder
privatefinalMap<String, ApiVersionsRequest.Builder>nodesNeedingApiVersionsFetch=newHashMap<>();
// 取消的请求集合
privatefinalList<ClientResponse>abortedSends=newLinkedList<>();
从该类属性字段来看比较多,这里说几个关键字段:
- selector:Kafka 自己封装的 Selector,该选择器负责监听「网络I/O事件」、「网络连接」、「读写操作」。
- metadataUpdater:NetworkClient 的内部类,主要用来实现Metadata元信息的更新器, 它可以尝试更新元信息。
- connectionStates:管理集群所有节点连接的状态,底层使用 Map<nodeid, NodeConnectionState>实现,NodeConnectionState 枚举值表示连接状态,并且记录了最后一次连接的时间戳。
- inFlightRequests:用来保存当前正在发送或等待响应的请求集合。
- socketSenderBuffer:表示套接字发送数据的缓冲区的大小。
- socketReceiveBuffer:表示套接字接收数据的缓冲区的大小。
- clientId:表示客户端id,标识客户端身份。
- reconnectBackoffMs:表示重连的退避事件,为了防止短时间内大量重连造成的网络压力,设计了这么一个时间段,在此时间段内不得重连。
2、关键方法
NetworkClient 类的方法也不少,这里针对关键方法逐一讲解下。
(1)ready()
/**
* Begin connecting to the given node, return true if we are already connected and ready to send to that node.
*
* @param node The node to check
* @param now The current timestamp
* @return True if we are ready to send to the given node
*/
@Override
publicbooleanready(Nodenode, longnow) {
// 空节点
if (node.isEmpty())
thrownewIllegalArgumentException("Cannot connect to empty node "+node);
// 1、判断节点是否准备好发送请求
if (isReady(node, now))
returntrue;
// 2、判断节点连接状态
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
// 3、初始化连接,但此时不一定连接成功了
initiateConnect(node, now);
returnfalse;
}
/**
* Check if the node with the given id is ready to send more requests.
* @param node The node
* @param now The current time in ms
* @return true if the node is ready
*/
@Override
publicbooleanisReady(Nodenode, longnow) {
// if we need to update our metadata now declare all requests unready to make metadata requests first priority
// 当发现正在更新元数据时,会禁止发送请求 && 当连接没有创建完毕或者当前发送的请求过多时,也会禁止发送请求
return!metadataUpdater.isUpdateDue(now) &&canSendRequest(node.idString(), now);
}
/**
* Are we connected and ready and able to send more requests to the given connection?
* 检测连接状态、发送请求是否过多
* @param node The node
* @param now the current timestamp
*/
privatebooleancanSendRequest(Stringnode, longnow) {
// 三个条件必须都满足
returnconnectionStates.isReady(node, now) &&selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}
该方法表示某个节点是否准备好并可以发送请求,主要做了三件事:
- 先判断节点是否已经准备好连接并接收请求了,需要满足以下四个条件:
- !metadataUpdater.isUpdateDue(now):不能是正在更新元数据的状态,且元数据不能过期。
- canSendRequest(node.idString(), now):此处有3个条件。(1)、客户端和 node 连接是否处于 ready 状态;(2)、客户端和 node 的 channel 是否建立好;(3)、inFlightRequests 中对应的节点是否可以接收更多的请求。
- 如果连接好返回 true 表示准备好,如果没有准备好接收请求,则会尝试与对应的 Node 连接,此处也需要满足两个条件:
首先连接必须是 isDisconnected,不能是 connecteding 状态,即客户端与服务端的连接状态是没有连接上。
两次重试之间时间差要大于重试退避时间,目的就是为了避免网络拥塞,防止重连过于频繁造成网络压力过大。
最后初始化连接。
(2)initiateConnect()
/**
* 创建连接
* Initiate a connection to the given node
* @param node the node to connect to
* @param now current time in epoch milliseconds
*/
privatevoidinitiateConnect(Nodenode, longnow) {
StringnodeConnectionId=node.idString();
try {
// 1、更新连接状态为正在连接
connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
// 获取连接地址
InetAddressaddress=connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
// 2、调用 selector 尝试异步进行连接,后续通过selector.poll进行监听事件就绪
selector.connect(nodeConnectionId,
newInetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOExceptione) {
log.warn("Error connecting to node {}", node, e);
// Attempt failed, we'll try again after the backoff
connectionStates.disconnected(nodeConnectionId, now);
// Notify metadata updater of the connection failure
metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
}
}
该方法主要是进行初始化连接,做了两件事:
- 调用 connectionStates.connecting() 更新连接状态为正在连接。
- 调用 selector.connect() 异步发起连接,此时不一定连接上了,后续 Selector.poll() 会监听连接是否准备好并完成连接,如果连接成功,则会将 ConnectionState 设置为 CONNECTED。
当连接准备好后,接下来我们来看下发送相关的方法。
(3)send()、doSend()
/**
* ClientRequest 是客户端的请求,封装了 requestBuilder
*/
publicfinalclassClientRequest {
// 节点地址
privatefinalStringdestination;
// ClientRequest 中通过 requestBuilder 给不同类型的请求设置不同的请求内容
privatefinalAbstractRequest.Builder<?>requestBuilder;
// 请求头的 correlationId
privatefinalintcorrelationId;
// 请求头的 clientid
privatefinalStringclientId;
// 创建时间
privatefinallongcreatedTimeMs;
// 是否需要进行响应
privatefinalbooleanexpectResponse;
// 请求的超时时间
privatefinalintrequestTimeoutMs;
// 回调函数 用来处理响应
privatefinalRequestCompletionHandlercallback;
......
}
/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
* @param request The request
* @param now The current timestamp
* 发送请求,这个方法 生产者和消费者都会调用,其中 ClientRequest 表示客户端的请求。
*/
@Override
publicvoidsend(ClientRequestrequest, longnow) {
doSend(request, false, now);
}
// 检测请求版本是否支持,如果支持则发送请求
privatevoiddoSend(ClientRequestclientRequest, booleanisInternalRequest, longnow) {
// 确认是否活跃
ensureActive();
// 目标节点id
StringnodeId=clientRequest.destination();
// 是否是 NetworkClient 内部请求 这里为 false
if (!isInternalRequest) {
// 检测是否可以向指定 Node 发送请求,如果还不能发送请求则抛异常
if (!canSendRequest(nodeId, now))
thrownewIllegalStateException("Attempt to send a request to node "+nodeId+" which is not ready.");
}
AbstractRequest.Builder<?>builder=clientRequest.requestBuilder();
try {
// 检测版本
NodeApiVersionsversionInfo=apiVersions.get(nodeId);
// ... 忽略
// builder.build()是 ProduceRequest.Builder,结果是ProduceRequest
// 调用 doSend 方法
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionExceptionunsupportedVersionException) { log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder, clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
// 请求的版本不协调,那么生成 clientResponse
ClientResponseclientResponse=newClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
// 添加到 abortedSends 集合里
abortedSends.add(clientResponse);
}
}
/**
* isInternalRequest 表示发送前是否需要验证连接状态,如果为 true 则表示客户端已经确定连接是好的
* request表示请求体
*/
privatevoiddoSend(ClientRequestclientRequest, booleanisInternalRequest, longnow, AbstractRequestrequest) {
// 目标节点地址
Stringdestination=clientRequest.destination();
// 生成请求头
RequestHeaderheader=clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
}
// 1、构建 NetworkSend 对象 结合请求头和请求体,序列化数据,保存到 NetworkSend
Sendsend=request.toSend(destination, header);
// 2、构建 inFlightRequest 对象 保存了发送前的所有信息
InFlightRequestinFlightRequest=newInFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
// 3、把 inFlightRequest 加入 inFlightRequests 集合里
this.inFlightRequests.add(inFlightRequest);
// 4、调用 Selector 异步发送数据,并将 send 和对应 kafkaChannel 绑定起来,并开启该 kafkaChannel 底层 socket 的写事件,等待下一步真正的网络发送
selector.send(send);
}
@Override
publicbooleanactive() {
// 判断状态是否是活跃的
returnstate.get() ==State.ACTIVE;
}
// 确认是否活跃
privatevoidensureActive() {
if (!active())
thrownewDisconnectException("NetworkClient is no longer active, state is "+state);
}
从上面源码可以看出此处发送并不是真正的网络发送,而是先将数据发送到缓存中。
- 首先最外层是 send() ,里面调用 doSend() 。
- 这里的 doSend() 主要的作用是判断 inFlightRequests 集合上对应的节点是不是能发送请求,需要满足三个条件:
- 客户端和 node 连接是否处于 ready 状态。
- 客户端和 node 的 channel 是否建立好。
- inFlightRequests 集合中对应的节点是否可以接收更多的请求。
- 最后再次调用另一个 doSend(),用来最终的请求发送到缓存中。步骤如下:
- 构建 NetworkSend 对象 结合请求头和请求体,序列化数据,保存到 NetworkSend。
- 构建 inFlightRequest 对象。
- 把 inFlightRequest 加入 inFlightRequests 集合里等待响应。
- 调用Selector异步发送数据,并将 send 和对应 kafkaChannel 绑定起来,并开启该 kafkaChannel 底层 socket 的写事件,等待下一步真正的网络发送。
综上可以得出这里的发送过程其实是把要发送的请求先封装成 inFlightRequest,然后放到 inFlightRequests 集合里,然后放到对应 channel 的字段 NetworkSend 里缓存起来。总之,这里的发送过程就是为了下一步真正的网络I/O发送而服务的。
接下来看下真正网络发送的方法。
(4)poll()
该方法执行网络发送并把响应结果「pollSelectionKeys 的各种读写」做各种状态处理,此处是通过调用 handleXXX() 方法进行处理的,代码如下:
/**
* Do actual reads and writes to sockets.
* @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
* must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
* metadata timeout
* @param now The current time in milliseconds
* @return The list of responses received
*/
@Override
publicList<ClientResponse>poll(longtimeout, longnow) {
// 确认是否活跃
ensureActive();
// 取消发送是否为空
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse>responses=newArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
returnresponses;
}
// 1、尝试更新元数据
longmetadataTimeout=metadataUpdater.maybeUpdate(now);
try {
// 2、执行网络 I/O 操作,真正读写发送的地方,如果客户端的请求被完整的处理过了,会加入到completeSends 或 complteReceives 集合中
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOExceptione) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
longupdatedNow=this.time.milliseconds();
// 响应结果集合:真正的读写操作, 会生成responses
List<ClientResponse>responses=newArrayList<>();
// 3、完成发送的handler,处理 completedSends 集合
handleCompletedSends(responses, updatedNow);
// 4、完成接收的handler,处理 completedReceives 队列
handleCompletedReceives(responses, updatedNow);
// 5、断开连接的handler,处理 disconnected 列表
handleDisconnections(responses, updatedNow);
// 6、处理连接的handler,处理 connected 列表
handleConnections();
// 7、处理版本协调请求(获取api版本号) handler
handleInitiateApiVersionRequests(updatedNow);
// 8、超时连接的handler,处理超时连接集合
handleTimedOutConnections(responses, updatedNow);
// 9、超时请求的handler,处理超时请求集合
handleTimedOutRequests(responses, updatedNow);
// 10、完成响应回调
completeResponses(responses);
returnresponses;
}
这里的步骤比较多,我们按照先后顺序讲解下。
- 尝试更新元数据。
- 调用 Selector.poll() 执行真正网络 I/O 操作,可以点击查看 图解 Kafka 源码网络层实现机制之 Selector 多路复用器 主要操作以下3个集合。
- connected集合:已经完成连接的 Node 节点集合。
- completedReceives集合:接收完成的集合,即 KafkaChannel 上的 NetworkReceive 写满后会放入这个集合里。
- completedSends集合:发送完成的集合,即 channel 上的 NetworkSend 读完后会放入这个集合里。
- 调用 handleCompletedSends() 处理 completedSends 集合。
- 调用 handleCompletedReceives() 处理 completedReceives 队列。
- 调用 handleDisconnections() 处理与 Node 断开连接的请求。
- 调用 handleConnections() 处理 connected 列表。
- 调用 handleInitiateApiVersionRequests() 处理版本号请求。
- 调用 handleTimedOutConnections() 处理连接超时的 Node 集合。
- 调用 handleTimedOutRequests() 处理 inFlightRequests 集合中的超时请求,并修改其状态。
- 调用 completeResponses() 完成每个消息自定义的响应回调。
接下来看下第 3~9 步骤的方法实现。
(5)handleCompletedSends()
当 NetworkClient 发送完请求后,就会调用 handleCompletedSends 方法,表示请求已经发送到 Broker 端了。
/**
* Handle any completed request send. In particular if no response is expected consider the request complete.
* @param responses The list of responses to update
* @param now The current time
*/
privatevoidhandleCompletedSends(List<ClientResponse>responses, longnow) {
// if no response is expected then when the send is completed, return it
// 1、遍历 completedSends 发送完成的请求集合,通过调用 Selector 获取从上一次 poll 开始的请求
for (Sendsend : this.selector.completedSends()) {
// 2、从 inFlightRequests 集合获取该 Send 关联对应 Node 的队列取出最新的请求,但并没有从队列中删除,取出后判断这个请求是否期望得到响应
InFlightRequestrequest=this.inFlightRequests.lastSent(send.destination());
// 3、是否需要响应, 如果不需要响应,当Send请求完成时,就直接返回.还是有request.completed生成的ClientResponse对象
if (!request.expectResponse) {
// 4、如果不需要响应就取出 inFlightRequests 中该 Sender 关联对应 Node 的 inFlightRequest,即提取最新的请求
this.inFlightRequests.completeLastSent(send.destination());
// 5、调用 completed() 生成 ClientResponse,第一个参数为null,表示没有响应内容,把请求添加到 Responses 集合
responses.add(request.completed(null, now));
}
}
}
该方法主要用来在客户端发送请求后,对响应结果进行处理,做了五件事:
- 遍历 seletor 中的 completedSends 集合,逐个处理完成的 Send 对象。
- 从 inFlightRequests 集合获取该 Send 关联对应 Node 的队列中第一个元素,但并没有从队列中删除,取出后判断这个请求是否期望得到响应。
- 判断是否需要响应。
- 如果不需要响应就删除 inFlightRequests 中该 Sender 关联对应 Node 的 inFlightRequest,对于 Kafka 来说,有些请求是不需要响应的,对于发送完不用考虑是否发送成功的话,就构建 callback 为 null 的 Response 对象。
- 通过 InFlightRequest.completed(),生成 ClientResponse,第一个参数为 null 表示没有响应内容,最后把 ClientResponse 添加到 Responses 集合。
从上面源码可以看出,「completedSends」集合与「InflightRequests」集合协作的关系。
但是这里有个问题:如何保证从 Selector 返回的请求,就是对应到 InflightRequests 集合队列的最新的请求呢?
completedSends 集合保存的是最近一次调用 poll() 方法中发送成功的请求「发送成功但还没有收到响应的请求集合」。而 InflightRequests 集合存储的是已经发送但还没收到响应的请求。每个请求发送都需要等待前面的请求发送完成,这样就能保证同一时间只有一个请求正在发送,因为 Selector 返回的请求是从上一次 poll 开始的,这样就对上了。
「completedSends」的元素对应着「InflightRequests」集合里对应队列的最后一个元素, 如下图所示:
(6)handleCompletedReceives()
当 NetworkClient 收到响应时,就会调用 handleCompletedReceives 方法。
/**
* Handle any completed receives and update the response list with the responses received.
* @param responses The list of responses to update
* @param now The current time
* 处理 CompletedReceives 队列,根据返回的响应信息实例化 ClientResponse ,并加到响应集合里
*/
privatevoidhandleCompletedReceives(List<ClientResponse>responses, longnow) {
// 1、遍历 CompletedReceives 响应集合,通过 Selector 返回未处理的响应
for (NetworkReceivereceive : this.selector.completedReceives()) {
// 2、获取发送请求的 Node id
Stringsource=receive.source();
// 3、从 inFlightRequests 集合队列获取已发送请求「最老的请求」并删除(从 inFlightRequests 删除,因为inFlightRequests 存储的是未收到请求响应的 ClientRequest,现在请求已经有响应了,就不需要保存了)
InFlightRequestreq=inFlightRequests.completeNext(source);
// 4、解析响应,并且验证响应头,生成 responseStruct 实例
StructresponseStruct=parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,throttleTimeSensor, now);
// 生成响应体
AbstractResponseresponse=AbstractResponse.parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
....
// If the received response includes a throttle delay, throttle the connection.
// 流控处理
maybeThrottle(response, req.header.apiVersion(), req.destination, now);
// 5、判断返回类型
if (req.isInternalRequest&&responseinstanceofMetadataResponse)
// 处理元数据请求响应
metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
elseif (req.isInternalRequest&&responseinstanceofApiVersionsResponse)
// 处理版本协调响应
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) response);
else
// 普通发送消息的响应,通过 InFlightRequest.completed(),生成 ClientResponse,将响应添加到 responses 集合中
responses.add(req.completed(response, now));
}
}
// 解析响应,并且验证响应头,生成 responseStruct 实例
privatestaticStructparseStructMaybeUpdateThrottleTimeMetrics(ByteBufferresponseBuffer, RequestHeaderrequestHeader, SensorthrottleTimeSensor, longnow) {
// 解析响应头
ResponseHeaderresponseHeader=ResponseHeader.parse(responseBuffer,
requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
// 解析响应体
StructresponseBody=requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
// 验证请求头与响应头的 correlation id 必须相等
correlate(requestHeader, responseHeader);
if (throttleTimeSensor!=null&&responseBody.hasField(CommonFields.THROTTLE_TIME_MS))
throttleTimeSensor.record(responseBody.get(CommonFields.THROTTLE_TIME_MS), now);
returnresponseBody;
}
该方法主要用来处理接收完毕的网络请求集合,做了五件事:
- 遍历 selector 中的 completedReceives 集合,逐个处理完成的 Receive 对象。
- 获取发送请求的 Node id。
- 从 inFlightRequests 集合队列获取已发送请求「最老的请求」并删除(从 inFlightRequests 删除,因为inFlightRequests 存储的是未收到请求响应的 ClientRequest,现在请求已经有响应了,就不需要保存了)。
- 解析响应,并且验证响应头,生成 responseStruct 实例,生成响应体。
- 处理响应结果,此处分为三种情况:
- 处理元数据请求响应,则调用 metadataUpdater.handleSuccessfulResponse()。
- 处理版本协调响应,则调用 handleApiVersionsResponse()。
- 普通发送消息的响应,通过 InFlightRequest.completed(),生成 ClientResponse,将响应添加到 responses 集合中。
从上面源码可以看出,「completedReceives」集合与「InflightRequests」集合也有协作的关系, completedReceives 集合指的是接收到的响应集合,如果请求已经收到响应了,就可以从 InflightRequests 删除了,这样 InflightRequests 就起到了可以防止请求堆积的作用。
与 「completedSends」正好相反,「completedReceives」集合对应 「InflightRequests」集合里对应队列的第一个元素,如下图所示:
(7)leastLoadedNode()
/**
* Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
* prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
* connection if all existing connections are in use. If no connection exists, this method will prefer a node
* with least recent connection attempts. This method will never choose a node for which there is no
* existing connection and from which we have disconnected within the reconnect backoff period, or an active
* connection which is being throttled.
*
* @return The node with the fewest in-flight requests.
*/
@Override
publicNodeleastLoadedNode(longnow) {
// 从元数据中获取所有的节点
List<Node>nodes=this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
thrownewIllegalStateException("There are no nodes in the Kafka cluster");
intinflight=Integer.MAX_VALUE;
NodefoundConnecting=null;
NodefoundCanConnect=null;
NodefoundReady=null;
intoffset=this.randOffset.nextInt(nodes.size());
for (inti=0; i<nodes.size(); i++) {
intidx= (offset+i) %nodes.size();
Nodenode=nodes.get(idx);
// 节点是否可以发送请求
if (canSendRequest(node.idString(), now)) {
// 获取节点的队列大小
intcurrInflight=this.inFlightRequests.count(node.idString());
// 如果为 0 则返回该节点,负载最小
if (currInflight==0) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
returnnode;
} elseif (currInflight<inflight) { // 如果队列大小小于最大值
// otherwise if this is the best we have found so far, record that
inflight=currInflight;
foundReady=node;
}
} elseif (connectionStates.isPreparingConnection(node.idString())) {
foundConnecting=node;
} elseif (canConnect(node, now)) {
if (foundCanConnect==null||
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
this.connectionStates.lastConnectAttemptMs(node.idString())) {
foundCanConnect=node;
}
} else {
log.trace("Removing node {} from least loaded node selection since it is neither ready "+
"for sending or connecting", node);
}
}
// We prefer established connections if possible. Otherwise, we will wait for connections
// which are being established before connecting to new nodes.
if (foundReady!=null) {
log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
returnfoundReady;
} elseif (foundConnecting!=null) {
log.trace("Found least loaded connecting node {}", foundConnecting);
returnfoundConnecting;
} elseif (foundCanConnect!=null) {
log.trace("Found least loaded node {} with no active connection", foundCanConnect);
returnfoundCanConnect;
} else {
log.trace("Least loaded node selection failed to find an available node");
returnnull;
}
}
该方法主要是选出一个负载最小的节点,如下图所示:
三、InflightRequests 集合设计
通过上面的代码分析,我们知道「InflightRequests」集合的作用就是缓存已经发送出去但还没有收到响应的 ClientRequest 请求集合。底层是通过 ReqMap<string, Deque<NetworkClient.InFlightRequest>> 实现,其中 key 是 NodeId,value 是发送到对应 Node 的 ClientRequest 请求队列,默认为5个,参数:max.in.flight.requests.per.connection 配置请求队列大小。它为每个连接生成一个双端队列,因此它能控制请求发送的速度。
其作用有以下2个:
- 节点是否正常:收集从「开始发送」到「接收响应」这段时间的请求,来判断要发送的 Broker 节点是否正常,请求和连接是否超时等等,也就是说用来监控发送到哥哥节点请求是否正常。
- 节点的负载情况:Deque 队列到一定长度后就认为某个 Broker 节点负载过高了。
/**
* The set of requests which have been sent or are being sent but haven't yet received a response
* 用来缓存已经发送出去或者正在发送但均还没有收到响应的 ClientRequest 请求集合
*/
finalclassInFlightRequests {
// 每个连接最大执行中的请求数
privatefinalintmaxInFlightRequestsPerConnection;
// 节点 Node 至客户端请求双端队列 Deque<NetworkClient.InFlightRequest> 的映射集合,key为 NodeId, value 是请求队列
privatefinalMap<String, Deque<NetworkClient.InFlightRequest>>requests=newHashMap<>();
/** Thread safe total number of in flight requests. */
// 线程安全的 inFlightRequestCount
privatefinalAtomicIntegerinFlightRequestCount=newAtomicInteger(0);
// 设置每个连接最大执行中的请求数
publicInFlightRequests(intmaxInFlightRequestsPerConnection) {
this.maxInFlightRequestsPerConnection=maxInFlightRequestsPerConnection;
}
这里通过「场景驱动」的方式来讲解关键方法,当有新请求需要发送处理时,会在队首入队。而实际被处理的请求,则是从队尾出队,保证入队早的请求先得到处理。
1、canSendMore()
先来看下发送条件限制, NetworkClient 调用这个方法用来判断是否还可以向指定 Node 发送请求。
/**
* Can we send more requests to this node?
* @param node Node in question
* @return true iff we have no requests still being sent to the given node
* 判断该连接是否还能发送请求
*/
publicbooleancanSendMore(Stringnode) {
// 获取节点对应的双端队列
Deque<NetworkClient.InFlightRequest>queue=requests.get(node);
// 判断条件 队列为空 || (队首已经发送完成 && 队列中没有堆积更多的请求)
returnqueue==null||queue.isEmpty() ||
(queue.peekFirst().send.completed() &&queue.size() <this.maxInFlightRequestsPerConnection);
}
从上面代码可以看出限制条件,队列虽然可以存储多个请求,但是新的请求要是加进来条件是上一个请求必须发送成功。
条件判断如下:
- queue == null || queue.isEmpty(),队列为空就能发送。
- 判断 queue.peekFirst().send.completed() 队首是否发送完成。
- 如果队首的请求迟迟发送不出去,可能就是网络的原因,因此不能继续向此 Node 发送请求。
- 队首的请求与对应的 KafkaChannel.send 字段指向的是同一个请求,为了避免未发送的消息被覆盖掉,也不能让 KafkaChannel.send 字段指向新请求。
- queue.size() < this.maxInFlightRequestsPerConnection,该条件就是为了判断队列中是否堆积过多请求,如果 Node 已经堆积了很多未响应的请求,说明这个节点出现了网络拥塞,继续再发送请求,则可能会超时。
2、add() 入队
/**
* Add the given request to the queue for the connection it was directed to
* 将请求添加到队列首部
*/
publicvoidadd(NetworkClient.InFlightRequestrequest) {
// 这个请求要发送到哪个 Broker 节点上
Stringdestination=request.destination;
// 从 requests 集合中根据给定请求的目标 Node 节点获取对应 Deque<ClientRequest> 双端队列 reqs
Deque<NetworkClient.InFlightRequest>reqs=this.requests.get(destination);
// 如果双端队列reqs为null
if (reqs==null) {
// 构造一个双端队列 ArrayDeque 类型的 reqs
reqs=newArrayDeque<>();
// 将请求目标 Node 节点至 reqs 的映射关系添加到 requests 集合
this.requests.put(destination, reqs);
}
// 将请求 request 添加到 reqs 队首
reqs.addFirst(request);
// 增加计数
inFlightRequestCount.incrementAndGet();
}
3、completeNext() 出队最老请求
/**
* Get the oldest request (the one that will be completed next) for the given node
* 取出该连接对应的队列中最老的请求
*/
publicNetworkClient.InFlightRequestcompleteNext(Stringnode) {
// 根据给定 Node 节点获取客户端请求双端队列 reqs,并从队尾出队
NetworkClient.InFlightRequestinFlightRequest=requestQueue(node).pollLast();
// 递减计数器
inFlightRequestCount.decrementAndGet();
returninFlightRequest;
}
对比下入队和出队这2个方法,「入队 add()」时是通过 addFirst() 方法添加到队首的,所以队尾的请求是时间最久的,也是应该先处理的,所以「出队 completeNext()」是通过 pollLast(),将队列中时间最久的请求袁术移出进行处理。
4、lastSent() 获取最新请求
/**
* Get the last request we sent to the given node (but don't remove it from the queue)
* @param node The node id
*/
publicNetworkClient.InFlightRequestlastSent(Stringnode) {
returnrequestQueue(node).peekFirst();
}
5、completeLastSent() 出队最新请求
/**
* Complete the last request that was sent to a particular node.
* @param node The node the request was sent to
* @return The request
* 取出该连接对应的队列中最新的请求
*/
publicNetworkClient.InFlightRequestcompleteLastSent(Stringnode) {
// 根据给定 Node 节点获取客户端请求双端队列 reqs,并从队首出队
NetworkClient.InFlightRequestinFlightRequest=requestQueue(node).pollFirst();
// 递减计数器
inFlightRequestCount.decrementAndGet();
returninFlightRequest;
}
最后我们来看看「InflightRequests」,表示正在发送的请求,存储着请求发送前的所有信息。
另外它支持生成响应 ClientResponse,当正常收到响应时,completed()会根据响应内容生成对应的 ClientResponse,当连接突然断开后,disconnected() 会生成 ClientResponse 对象,代码如下:
staticclassInFlightRequest {
// 请求头
finalRequestHeaderheader;
// 这个请求要发送到哪个 Broker 节点上
finalStringdestination;
// 回调函数
finalRequestCompletionHandlercallback;
// 是否需要进行响应
finalbooleanexpectResponse;
// 请求体
finalAbstractRequestrequest;
// 发送前是否需要验证连接状态
finalbooleanisInternalRequest; // used to flag requests which are initiated internally by NetworkClient
// 请求的序列化数据
finalSendsend;
// 发送时间
finallongsendTimeMs;
// 请求的创建时间,即 ClientRequest 的创建时间
finallongcreatedTimeMs;
// 请求超时时间
finallongrequestTimeoutMs;
.....
/**
* 收到响应,回调的时候据响应内容生成 ClientResponse
*/
publicClientResponsecompleted(AbstractResponseresponse, longtimeMs) {
returnnewClientResponse(header, callback, destination, createdTimeMs, timeMs,
false, null, null, response);
}
/**
* 当连接突然断开,也会生成 ClientResponse。
*/
publicClientResponsedisconnected(longtimeMs, AuthenticationExceptionauthenticationException) {
returnnewClientResponse(header, callback, destination, createdTimeMs, timeMs,
true, null, authenticationException, null);
}
}
中间的部分代码请移步到星球查看。
五、完整请求流程串联
一条完整的请求主要分为以下几个阶段:
- 调用 NetworkClient 的 ready(),连接服务端。
- 调用 NetworkClient 的 poll(),处理连接。
- 调用 NetworkClient 的 newClientRequest(),创建请求 ClientRequest。
- 然后调用 NetworkClient 的 send(),发送请求。
- 最后调用 NetworkClient 的 poll(),处理响应。
1、创建连接过程
NetworkClient 发送请求之前,都需要先和 Broker 端创建连接。NetworkClient 负责管理与集群的所有连接。
2、生成请求过程
3、发送请求过程
4、处理响应过程
(1)请求发送完成
(2)请求收到响应
(3)执行处理响应
六、总结
这里,我们一起来总结一下这篇文章的重点。
1、开篇总述消息消息被 Sender 子线程先将消息暂存到 KafkaChannel 的 send 中,等调用「poll方法」执行真正的网络I/O 操作,从而引出了为客户端提供网络 I/O 能力的 「NetworkClient 组件」。
2、带你深度剖析了「NetworkClient 组件」 、「InflightRequests」、「ClusterConnectionState」的实现细节。
3、最后带你串联了整个消息发送请求和处理响应的流程,让你有个更好的整体认知。
文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/249695.html<

