内容简介:这一章节主要对和Listener相关的四个配置项做以详细解释。在解释这些配置项之前,我们先来明确几个概念。
这一章节主要对和Listener相关的四个配置项做以详细解释。 listeners
、 advertised.listeners
、 listener.security.protocol.map
、 inter.broker.listener.name
这四个配置项可能是大家最容易混淆和最不容易理解的。
在解释这些配置项之前,我们先来明确几个概念。
- 部署Broker的阿里云ECS称为Host Machine。
- 在阿里云ECS里启动的Producer或者Consumer,比如使用Kafka CLI启动的称为Internal Client。
- 在大家的IDEA中使用 Java 编写的,或者第三方的Producer/Consumer,称为External Client。
- Host Machine具有外网IP和内网IP。
- Internal Client可以同时和Host Machine的外网IP及内网IP通信。
- External Client只能和Host Machine的外网IP通信。
- 多个阿里云ECS之间可以同时通过外网IP及内网IP通信。
- 既在这个特定的场景下,Host Machine之间可以同时通过外网IP及内网IP通信。
- 再换句话说就是不同Host Machine上的Broker之间可以同时通过外网IP及内网IP通信。
如上图所示,是一个很常见的Kafka集群场景,涵盖了上述的概念。图中那些通信虚线箭头就是靠Kafka的Listener建立的,并且是通过Kafka中不同的Listener建立的,这些Listener分为Internal Listener和External Listener。如下图所示:
那么这些Listener的创建以及内外部如何通信都是由上面那四个配置项决定的。
listener.security.protocol.map
先来看 listener.security.protocol.map
配置项,在上一章节中介绍过,它是配置监听者的安全协议的,比如 PLAINTEXT
、 SSL
、 SASL_PLAINTEXT
、 SASL_SSL
。因为它是以Key/Value的形式配置的,所以往往我们也使用该参数给Listener命名:
listener.security.protocol.map=EXTERNAL_LISTENER_CLIENTS:SSL,INTERNAL_LISTENER_CLIENTS:PLAINTEXT,INTERNAL_LISTENER_BROKER:PLAINTEXT
使用Key作为Listener的名称。就如上图所示,Internal Producer、External Producer、Internal Consumer、External Consumer和Broker通信以及Broker之间互相通信时都很有可能使用不同的Listener。这些不同的Listener有监听内网IP的,有监听外网IP的,还有不同安全协议的,所以使用Key来表示更加直观。当然这只是一种非官方的用法,Key本质上还是代表了安全协议,如果只有一个安全协议,多个Listener的话,那么这些Listener所谓的名称肯定都是相同的。
listeners
listeners
就是主要用来定义Kafka Broker的Listener的配置项。
listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092,INTERNAL_LISTENER_CLIENTS://阿里云ECS内网IP:9093,INTERNAL_LISTENER_BROKER://阿里云ECS内网IP:9094
上面的配置表示,这个Broker定义了三个Listener,一个External Listener,用于External Producer和External Consumer连接使用。也许因为业务场景的关系,Internal Producer和Broker之间使用不同的安全协议进行连接,所以定义了两个不同协议的Internal Listener,分别用于Internal Producer和Broker之间连接使用。
通过之前的章节,我们知道Kafka是由Zookeeper进行管理的,由Zookeeper负责Leader选举,Broker Rebalance等工作。所以External Producer和External Consumer其实是通过Zookeeper中提供的信息和Broker通信交互的。所以 listeners
中配置的信息都会发布到Zookeeper中,但是这样就会把Broker的所有Listener信息都暴露给了外部Clients,在安全上是存在隐患的,我们希望只把给外部Clients使用的Listener暴露出去,此时就需要用到下面这个配置项了。
advertised.listeners
advertised.listeners
参数的作用就是将Broker的Listener信息发布到Zookeeper中,供Clients(Producer/Consumer)使用。如果配置了 advertised.listeners
,那么就不会将 listeners
配置的信息发布到Zookeeper中去了:
advertised.listeners=EXTERNAL_LISTENER_CLIENTS://阿里云ECS外网IP:9092
这里在Zookeeper中发布了供External Clients(Producer/Consumer)使用的Listener EXTERNAL_LISTENER_CLIENTS
。所以 advertised.listeners
配置项实现了只把给外部Clients使用的Listener暴露出去的需求。
inter.broker.listener.name
这个配置项从名称就可以看出它的作用了,就是指定一个 listener.security.protocol.map
配置项中配置的Key,或者说指定一个或一类Listener的名称,将它作为Internal Listener。这个Listener 专门用于Kafka集群中Broker之间的通信 :
inter.broker.listener.name=INTERNAL_LISTENER_BROKER
listener 和 advertised.listeners 的关系
先来看看 KafkaConfig.scala
和 SocketServer.scala
源码中的这几行代码片段:
// KafkaConfig.scala ... val ListenersProp = "listeners" ... def dataPlaneListeners: Seq[EndPoint] = { Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match { case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName) case None => listeners } } ... def listeners: Seq[EndPoint] = { Option(getString(KafkaConfig.ListenersProp)).map { listenerProp => CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap) }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap)) } // SocketServer.scala def startup(startupProcessors: Boolean = true) { this.synchronized { connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners) if (startupProcessors) { startControlPlaneProcessor() startDataPlaneProcessors() } } ... private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = synchronized { endpoints.foreach { endpoint => val dataPlaneAcceptor = createAcceptor(endpoint) addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start() dataPlaneAcceptor.awaitStartup() dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) info(s"Created data-plane acceptor and processors for endpoint : $endpoint") } }
startup()
方法是Kafka Broker创建启动Socket连接的入口,既用来创建Acceptor线程的入口,该线程负责处理Socket连接。 createDataPlaneAcceptorsAndProcessors()
方法的第二个参数 config.dataPlaneListeners
可以看到取的就是 listeners
配置项的内容。
/** * Create a server socket to listen for connections on. */ private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if (host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort)) } catch { case e: SocketException => throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString, port, e.getMessage), e) } serverChannel }
跟到里面,可以看到如果没有配置 listeners
,那么会使用网卡地址创建Socket连接,对于阿里云ECS,就是内网IP。
再来看看 KafkaServer.scala
源码中的这几行代码片段:
... val brokerInfo = createBrokerInfo val brokerEpoch = zkClient.registerBroker(brokerInfo) ... private[server] def createBrokerInfo: BrokerInfo = { val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}") zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker => val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints) require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" + s" advertised listeners are already registered by broker ${broker.id}") } val listeners = config.advertisedListeners.map { endpoint => if (endpoint.port == 0) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else endpoint } val updatedEndpoints = listeners.map(endpoint => if (endpoint.host == null || endpoint.host.trim.isEmpty) endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName) else endpoint ) val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt BrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort) }
从上面的代码可以看到, advertised.listeners
主要用于向Zookeeper注册Broker的连接信息,但是不参与创建Socket连接。
所以从这几处源码内容可以得出结论,Kafka Broker真正建立通信连接使用的是 listeners
配置项里的内容,而 advertised.listeners
只用于向Zookeeper注册Broker的连接信息,既向Client暴露Broker对外的连接信息(Endpoint)。
另外在 KafkaConfig.scala
源码中还有有这么几行代码:
val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet val listenerNames = listeners.map(_.listenerName).toSet require(advertisedListenerNames.contains(interBrokerListenerName), s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") require(advertisedListenerNames.subsetOf(listenerNames), s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " + s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + s"are ${listenerNames.map(_.value).mkString(",")}"
从上面的代码片段可以得出两个结论:
-
advertised.listeners
配置项中配置的Listener名称或者说安全协议必须在listeners
中存在。因为真正创建连接的是listeners
中的信息。 -
inter.broker.listener.name
配置项中配置的Listener名称或者说安全协议必须在advertised.listeners
中存在。因为Broker之间也是要通过advertised.listeners
配置项获取Internal Listener信息的。
以上所述就是小编给大家介绍的《Kafka从上手到实践-Kafka集群:Kafka Listeners》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Kafka从上手到实践-Kafka集群:启动Kafka集群
- Kafka从上手到实践-Kafka集群:配置Broker
- Kafka从上手到实践-实践真知:搭建Zookeeper集群
- Kafka从上手到实践-Kafka集群:重要配置和性能探讨
- 快速上手virtualenv
- MongoDB 简单上手
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。