记录一次kafka-manager1.3.3.18 Consumers is not display with kafka2.1.0 version

栏目: 后端 · 发布时间: 5年前

内容简介:1.3.3.18来管理kafka, 然后写了一个生产者和消费者程序,程序运行后,死活显示不出来程序运行后,消费者的group死活显示不出来。生产者代码如下:
使用kafka 2.1.0 ,然后用最新的kafka-manager  记录一次kafka-manager1.3.3.18 Consumers is not display with kafka2.1.0 version

1.3.3.18来管理kafka, 然后写了一个生产者和消费者程序,程序运行后,死活显示不出来

程序运行后,消费者的group死活显示不出来。

生产者代码如下:

package com.kafka.producer;

import org.apache.commons.lang3.exception.ExceptionUtils;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

import java.util.concurrent.ExecutionException;

public class ProducerDemo {

public static void main(String[] args) {

int i = 0;

while (true) {

i++;

try {

send("test", String.format("test_%d", i), "123");

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

System.out.println(String.format("Kafka写入:%d", i));

}

}

private static Producer<String, Object> producer;

private static KafkaConsumer<String, Object> consumer;

private static final String server = "127.0.0.1:9092";

static {

Properties props = buildProducerConfig();

producer = new KafkaProducer<>(props);

private static Properties buildProducerConfig() {

Properties props = new Properties();

// bootstrap.servers是Kafka集群的IP地址,也就是Broker地址

props.put("bootstrap.servers", server);

props.put("acks", "all");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

return props;

public static RecordMetadata send(String topic, String key, Object Obj) throws InterruptedException, ExecutionException {

return producer.send(new ProducerRecord<String, Object>(topic, key, Obj)).get();

public static void sendAsync(String topic,String key,Object obj) {

producer.send(new ProducerRecord<String, Object>(topic, key, obj), new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception e) {

if(e !=null) {

System.out.println(ExceptionUtils.getStackTrace(e));

}

消费者程序如下:

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

public class ConsumerDemo {

public static void main(String[] args) {

KafkaUtils.consume();

}

private static KafkaConsumer<String, Object> consumer;

private static final String server = "127.0.0.1:9092";

static {

Properties props = buildConsumerConfig();

consumer = new KafkaConsumer<>(props);

}

private static Properties buildConsumerConfig() {

Properties props;

props = new Properties();

props.put("bootstrap.servers", server);

// 消费组

props.put("group.id", "testGroup");

props.put("enable.auto.commit", "true");

// 设置多久一次更新被消费消息的偏移量

props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

return props;

}

public static void consume() {

consumer.subscribe(Arrays.asList("test"));

while (true) {

// 每隔100ms拉取一次数据

ConsumerRecords<String, Object> records = consumer.poll(100);

for (ConsumerRecord<String, Object> record : records) {

System.out.printf("partition=%d,offset = %d, key = %s, value = %s\n", record.partition(),

record.offset(), record.key(), record.value());

}

}

}

}

然后在kafka manager的消费者组显示不出来,为了查找原因,去看kafka manager日志。发现日志报错如下:

[warn] k.m.a.c.KafkaManagedOffsetCache - Failed to process a message from offset topic on cluster test-Kafka!

kafka.common.KafkaException: Unknown offset schema version 3

at kafka.manager.utils.one10.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:428) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at kafka.manager.utils.one10.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:532) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:332) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at scala.util.Success.foreach(Try.scala:236) [org.scala-lang.scala-library-2.11.12.jar:na]

at kafka.manager.actor.cluster.KafkaManagedOffsetCache.run(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]

初步诊断是kafka manager的问题,觉得具体深入分析下,发现kafka manager是用scala写的, 自己有不了解scala,顿时感觉无从下手,

但是想想,程序应该都差不多,就去分析分析原因吧,发现错误日志在GroupMetadataManager.scala:428,这行,那应该错误也在这边,

然后在google找了找,也没有很好的解决方式,只能在github的kafka manager提了个Issue,发现有人修改过源代码后成功显示了,安装这位老兄的提示

修改scala源代码,然后重新编译打包,问题终于得到了解决。

修改的scala源代码如下:

git diff origin/master

diff --git a/app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala

index 85771cd..f16b1a3 100644

--- a/app/kafka/manager/utils/one10/GroupMetadataManager.scala

+++ b/app/kafka/manager/utils/one10/GroupMetadataManager.scala

@@ -368,6 +368,25 @@ object GroupMetadataManager {

new Field(SUBSCRIPTION_KEY, BYTES),

new Field(ASSIGNMENT_KEY, BYTES))

+  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1

+

+  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),

+    new Field("metadata", STRING, "Associated metadata.", ""),

+    new Field("commit_timestamp", INT64))

+  private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")

+  private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")

+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")

+

+  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(

+    new Field("offset", INT64),

+    new Field("leader_epoch", INT32),

+    new Field("metadata", STRING, "Associated metadata.", ""),

+    new Field("commit_timestamp", INT64))

+  private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")

+  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")

+  private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")

+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")

+

private val PROTOCOL_TYPE_KEY = "protocol_type"

private val GENERATION_KEY = "generation"

private val PROTOCOL_KEY = "protocol"

@@ -388,6 +407,12 @@ object GroupMetadataManager {

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))

+  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(

+    new Field(PROTOCOL_TYPE_KEY, STRING),

+    new Field(GENERATION_KEY, INT32),

+    new Field(PROTOCOL_KEY, NULLABLE_STRING),

+    new Field(LEADER_KEY, NULLABLE_STRING),

+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))

// map of versions to key schemas as data types

private val MESSAGE_TYPE_SCHEMAS = Map(

@@ -398,13 +423,18 @@ object GroupMetadataManager {

// map of version of offset value schemas

private val OFFSET_VALUE_SCHEMAS = Map(

0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,

-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)

+    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,

+    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,

+    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)

+

private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort

// map of version of group metadata value schemas

private val GROUP_VALUE_SCHEMAS = Map(

0 -> GROUP_METADATA_VALUE_SCHEMA_V0,

-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)

+    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,

+    2 -> GROUP_METADATA_VALUE_SCHEMA_V2)

+

private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort

private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)

@@ -545,6 +575,20 @@ object GroupMetadataManager {

val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)

+      } else if (version == 2) {

+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]

+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]

+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]

+

+        OffsetAndMetadata(offset, metadata, commitTimestamp)

+      } else if (version == 3) {

+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]

+        val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]

+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]

+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]

+

+        // val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)

+        OffsetAndMetadata(offset, metadata, commitTimestamp)

} else {

throw new IllegalStateException("Unknown offset message version")

}

完整的app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala代码如下:

/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

package kafka.manager.utils.one10

import java.io.PrintStream

import java.nio.ByteBuffer

import java.nio.charset.StandardCharsets

import java.util.UUID

import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}

import kafka.utils.{Logging, nonthreadsafe}

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.clients.consumer.internals.{ConsumerProtocol, PartitionAssignor}

import org.apache.kafka.common.TopicPartition

import org.apache.kafka.common.protocol.types.Type._

import org.apache.kafka.common.protocol.types._

import org.apache.kafka.common.utils.Utils

import scala.collection.JavaConverters._

import scala.collection.{Seq, immutable, mutable, _}

/**

* Case class used to represent group metadata for the ListGroups API

*/

case class GroupOverview(groupId: String,

protocolType: String)

/**

* We cache offset commits along with their commit record offset. This enables us to ensure that the latest offset

* commit is always materialized when we have a mix of transactional and regular offset commits. Without preserving

* information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit

* being materialized.

*/

case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {

def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get

}

/**

* Group contains the following metadata:

*

*  Membership metadata:

*  1. Members registered in this group

*  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)

*  3. Protocol metadata associated with group members

*

*  State metadata:

*  1. group state

*  2. generation id

*  3. leader id

*/

@nonthreadsafe

class GroupMetadata(val groupId: String

, var protocolType: Option[String]

, var generationId: Int

, var protocol: Option[String]

, var leaderId: Option[String]

extends Logging {

private val members =  new mutable.HashMap[String, MemberMetadata]

private val offsets =  new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]

private val pendingOffsetCommits =  new mutable.HashMap[TopicPartition, OffsetAndMetadata]

private val pendingTransactionalOffsetCommits =  new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()

private var receivedTransactionalOffsetCommits =  false

private var receivedConsumerOffsetCommits =  false

var newMemberAdded: Boolean =  false

def has(memberId: String) = members.contains(memberId)

def get(memberId: String) = members(memberId)

def isLeader(memberId: String): Boolean = leaderId.contains(memberId)

def leaderOrNull: String = leaderId.orNull

def protocolOrNull: String = protocol.orNull

def add(member: MemberMetadata) {

if (members.isEmpty)

this .protocolType = Some(member.protocolType)

assert (groupId == member.groupId)

assert ( this .protocolType.orNull == member.protocolType)

assert (supportsProtocols(member.protocols))

if (leaderId.isEmpty)

leaderId = Some(member.memberId)

members.put(member.memberId, member)

}

def remove(memberId: String) {

members.remove(memberId)

if (isLeader(memberId)) {

leaderId =  if (members.isEmpty) {

None

else {

Some(members.keys.head)

}

}

}

def allMembers = members.keySet

def allMemberMetadata = members.values.toList

//  TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString

private def candidateProtocols = {

//  get the set of protocols that are commonly supported by all members
allMemberMetadata

.map(_.protocols)

.reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)

}

def supportsProtocols(memberProtocols: Set[String]) = {

members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty

}

def overview: GroupOverview = {

GroupOverview(groupId, protocolType.getOrElse(""))

}

def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],

pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {

this .offsets ++= offsets

this .pendingTransactionalOffsetCommits ++= pendingTxnOffsets

}

def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {

if (pendingOffsetCommits.contains(topicPartition)) {

if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)

throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +

"in the log.")

if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))

offsets.put(topicPartition, offsetWithCommitRecordMetadata)

}

pendingOffsetCommits.get(topicPartition) match {

case Some(stagedOffset)  if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>

pendingOffsetCommits.remove(topicPartition)

case _ =>

//

The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case

//  its entries would be removed from the cache by the `removeOffsets` method.
}

}

def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {

pendingOffsetCommits.get(topicPartition) match {

case Some(pendingOffset)  if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)

case _ =>

}

}

def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {

receivedConsumerOffsetCommits =  true

pendingOffsetCommits ++= offsets

}

def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending")

receivedTransactionalOffsetCommits =  true

val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,

mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])

offsets.foreach {  case (topicPartition, offsetAndMetadata) =>

producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))

}

}

def hasReceivedConsistentOffsetCommits : Boolean = {

!receivedConsumerOffsetCommits || !receivedTransactionalOffsetCommits

}

/*

Remove a pending transactional offset commit if the actual offset commit record was not written to the log.

* We will return an error and the client will retry the request, potentially to a different coordinator.

*/

def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition): Unit = {

pendingTransactionalOffsetCommits.get(producerId) match {

case Some(pendingOffsets) =>

val pendingOffsetCommit = pendingOffsets.remove(topicPartition)

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetCommit failed " +

s"to be appended to the log")

if (pendingOffsets.isEmpty)

pendingTransactionalOffsetCommits.remove(producerId)

case _ =>

//  We may hit this case if the partition in question has emigrated already.
}

}

def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,

commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) {

pendingTransactionalOffsetCommits.get(producerId) match {

case Some(pendingOffset) =>

if (pendingOffset.contains(topicPartition)

&& pendingOffset(topicPartition).offsetAndMetadata == commitRecordMetadataAndOffset.offsetAndMetadata)

pendingOffset.update(topicPartition, commitRecordMetadataAndOffset)

case _ =>

//  We may hit this case if the partition in question has emigrated.
}

}

/*

Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written

* to the log.

*/

def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = {

val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)

if (isCommit) {

pendingOffsetsOpt.foreach { pendingOffsets =>

pendingOffsets.foreach {  case (topicPartition, commitRecordMetadataAndOffset) =>

if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)

throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +

s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")

val currentOffsetOpt = offsets.get(topicPartition)

if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +

"committed and loaded into the cache.")

offsets.put(topicPartition, commitRecordMetadataAndOffset)

else {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +

s"committed, but not loaded since its offset is older than current offset $currentOffsetOpt.")

}

}

}

else {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetsOpt aborted")

}

}

def activeProducers = pendingTransactionalOffsetCommits.keySet

def hasPendingOffsetCommitsFromProducer(producerId: Long) =

pendingTransactionalOffsetCommits.contains(producerId)

def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {

topicPartitions.flatMap { topicPartition =>

pendingOffsetCommits.remove(topicPartition)

pendingTransactionalOffsetCommits.foreach {  case (_, pendingOffsets) =>

pendingOffsets.remove(topicPartition)

}

val removedOffset = offsets.remove(topicPartition)

removedOffset.map(topicPartition -> _.offsetAndMetadata)

}.toMap

}

def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {

val expiredOffsets = offsets

.filter {

case (topicPartition, commitRecordMetadataAndOffset) =>

commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)

}

.map {

case (topicPartition, commitRecordOffsetAndMetadata) =>

(topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)

}

offsets --= expiredOffsets.keySet

expiredOffsets.toMap

}

def allOffsets = offsets.map {  case (topicPartition, commitRecordMetadataAndOffset) =>

(topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)

}.toMap

def offset(topicPartition: TopicPartition): Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)

//  visible for testing
def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)

def numOffsets = offsets.size

def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty || pendingTransactionalOffsetCommits.nonEmpty

override def toString: String = {

"GroupMetadata(" +

s"groupId=$groupId, " +

s"generation=$generationId, " +

s"protocolType=$protocolType, " +

s"members=$members)"

}

}

/**

* Messages stored for the group topic has versions for both the key and value fields. Key

* version is used to indicate the type of the message (also to differentiate different types

* of messages from being compacted together if they have the same field values); and value

* version is used to evolve the messages within their data types:

*

* key version 0:       group consumption offset

*    -> value version 0:       [offset, metadata, timestamp]

*

* key version 1:       group consumption offset

*    -> value version 1:       [offset, metadata, commit_timestamp, expire_timestamp]

*

* key version 2:       group metadata

*     -> value version 0:       [protocol_type, generation, protocol, leader, members]

*/

object GroupMetadataManager {

private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort

private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort

private val CURRENT_GROUP_KEY_SCHEMA_VERSION2 = 3.toShort

private val OFFSET_COMMIT_KEY_SCHEMA =  new Schema( new Field("group", STRING),

new Field("topic", STRING),

new Field("partition", INT32))

private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")

private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")

private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")

private val OFFSET_COMMIT_VALUE_SCHEMA_V0 =  new Schema( new Field("offset", INT64),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")

private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")

private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")

private val OFFSET_COMMIT_VALUE_SCHEMA_V1 =  new Schema( new Field("offset", INT64),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("commit_timestamp", INT64),

new Field("expire_timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")

private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")

private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")

private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")

// new add for version
private val OFFSET_COMMIT_VALUE_SCHEMA_V2 =  new Schema( new Field("offset", INT64),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("commit_timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")

private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")

private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")

private val OFFSET_COMMIT_VALUE_SCHEMA_V3 =  new Schema(

new Field("offset", INT64),

new Field("leader_epoch", INT32),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("commit_timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")

private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")

private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")

private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")

// new add for version 3-end

private val GROUP_METADATA_KEY_SCHEMA =  new Schema( new Field("group", STRING))

private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")

private val MEMBER_ID_KEY = "member_id"

private val CLIENT_ID_KEY = "client_id"

private val CLIENT_HOST_KEY = "client_host"

private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"

private val SESSION_TIMEOUT_KEY = "session_timeout"

private val SUBSCRIPTION_KEY = "subscription"

private val ASSIGNMENT_KEY = "assignment"

private val MEMBER_METADATA_V0 =  new Schema(

new Field(MEMBER_ID_KEY, STRING),

new Field(CLIENT_ID_KEY, STRING),

new Field(CLIENT_HOST_KEY, STRING),

new Field(SESSION_TIMEOUT_KEY, INT32),

new Field(SUBSCRIPTION_KEY, BYTES),

new Field(ASSIGNMENT_KEY, BYTES))

private val MEMBER_METADATA_V1 =  new Schema(

new Field(MEMBER_ID_KEY, STRING),

new Field(CLIENT_ID_KEY, STRING),

new Field(CLIENT_HOST_KEY, STRING),

new Field(REBALANCE_TIMEOUT_KEY, INT32),

new Field(SESSION_TIMEOUT_KEY, INT32),

new Field(SUBSCRIPTION_KEY, BYTES),

new Field(ASSIGNMENT_KEY, BYTES))

// new add for version
private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1

private val PROTOCOL_TYPE_KEY = "protocol_type"

private val GENERATION_KEY = "generation"

private val PROTOCOL_KEY = "protocol"

private val LEADER_KEY = "leader"

private val MEMBERS_KEY = "members"

private val GROUP_METADATA_VALUE_SCHEMA_V0 =  new Schema(

new Field(PROTOCOL_TYPE_KEY, STRING),

new Field(GENERATION_KEY, INT32),

new Field(PROTOCOL_KEY, NULLABLE_STRING),

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY,  new ArrayOf(MEMBER_METADATA_V0)))

private val GROUP_METADATA_VALUE_SCHEMA_V1 =  new Schema(

new Field(PROTOCOL_TYPE_KEY, STRING),

new Field(GENERATION_KEY, INT32),

new Field(PROTOCOL_KEY, NULLABLE_STRING),

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY,  new ArrayOf(MEMBER_METADATA_V1)))

private val GROUP_METADATA_VALUE_SCHEMA_V2 =  new Schema(

new Field(PROTOCOL_TYPE_KEY, STRING),

new Field(GENERATION_KEY, INT32),

new Field(PROTOCOL_KEY, NULLABLE_STRING),

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY,  new ArrayOf(MEMBER_METADATA_V2)))

//  map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(

0 -> OFFSET_COMMIT_KEY_SCHEMA,

1 -> OFFSET_COMMIT_KEY_SCHEMA,

2 -> GROUP_METADATA_KEY_SCHEMA

)

//  map of version of offset value schemas
private val OFFSET_VALUE_SCHEMAS = Map(

1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,

2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,

3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3

)

private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort

//  map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(

1 -> GROUP_METADATA_VALUE_SCHEMA_V1,

2 -> GROUP_METADATA_VALUE_SCHEMA_V2)

private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort

private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)

private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)

private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)

private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)

private def schemaForKey(version: Int) = {

val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)

schemaOpt match {

case Some(schema) => schema

case _ =>  throw new KafkaException("Unknown offset schema version " + version)

}

}

private def schemaForOffset(version: Int) = {

val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)

println("version is:"+version+", schemaOpt is: "+schemaOpt)

schemaOpt match {

case Some(schema) => schema

case _ =>  throw new KafkaException("Unknown offset schema version " + version)

}

}

private def schemaForGroup(version: Int) = {

val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)

schemaOpt match {

case Some(schema) => schema

case _ =>  throw new KafkaException("Unknown group metadata version " + version)

}

}

/**

* Generates the key for offset commit message for given (group, topic, partition)

*

@return

key for offset commit message

*/

def offsetCommitKey(group: String, topicPartition: TopicPartition,

versionId: Short = 0): Array[Byte] = {

val key =  new Struct(CURRENT_OFFSET_KEY_SCHEMA)

key.set(OFFSET_KEY_GROUP_FIELD, group)

key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)

key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)

val byteBuffer = ByteBuffer.allocate(2  /*  version  */ + key.sizeOf)

byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)

key.writeTo(byteBuffer)

byteBuffer.array()

}

/**

* Generates the key for group metadata message for given group

*

@return

key bytes for group metadata message

*/

def groupMetadataKey(group: String): Array[Byte] = {

val key =  new Struct(CURRENT_GROUP_KEY_SCHEMA)

key.set(GROUP_KEY_GROUP_FIELD, group)

val byteBuffer = ByteBuffer.allocate(2  /*  version  */ + key.sizeOf)

byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)

key.writeTo(byteBuffer)

byteBuffer.array()

}

/**

* Generates the payload for offset commit message from given offset and metadata

*

@param

offsetAndMetadata consumer's current offset and metadata

@return

payload for offset commit message

*/

def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {

//  generate commit value with schema version 1
val value =  new Struct(CURRENT_OFFSET_VALUE_SCHEMA)

value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)

value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)

value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)

value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)

val byteBuffer = ByteBuffer.allocate(2  /*  version  */ + value.sizeOf)

byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)

value.writeTo(byteBuffer)

byteBuffer.array()

}

/**

* Decodes the offset messages' key

*

@param

buffer input byte-buffer

@return

an GroupTopicPartition object

*/

def readMessageKey(buffer: ByteBuffer): BaseKey = {

val version = buffer.getShort

val keySchema = schemaForKey(version)

val key = keySchema.read(buffer)

if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {

//  version 0 and 1 refer to offset
val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]

val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]

val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]

OffsetKey(version, GroupTopicPartition(group,  new TopicPartition(topic, partition)))

else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {

//  version 2 refers to offset
val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

GroupMetadataKey(version, group)

else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION2) { //

new add

//  version 3 refers to offset
val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

GroupMetadataKey(version, group)

else {

throw new IllegalStateException("Unknown version " + version + " for group metadata message")

}

}

/**

* Decodes the offset messages' payload and retrieves offset and metadata from it

*

@param

buffer input byte-buffer

@return

an offset-metadata object from the message

*/

def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {

if (buffer ==  null ) {  //  tombstone
null

else {

val version = buffer.getShort

val valueSchema = schemaForOffset(version)

val value = valueSchema.read(buffer)

if (version == 0) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]

val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, timestamp)

else if (version == 1) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]

val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)

else if (version == 2) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]

val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp)

else if (version == 3) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]

val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]

val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp)

else {

throw new IllegalStateException("Unknown offset message version")

}

}

}

/**

* Decodes the group metadata messages' payload and retrieves its member metadatafrom it

*

@param

buffer input byte-buffer

@return

a group metadata object from the message

*/

def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {

if (buffer ==  null ) {  //  tombstone
null

else {

val version = buffer.getShort

val valueSchema = schemaForGroup(version)

val value = valueSchema.read(buffer)

if (version == 0 || version == 1) {

val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]

val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]

val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]

val leaderId = value.get(LEADER_KEY).asInstanceOf[String]

val memberMetadataArray = value.getArray(MEMBERS_KEY)

val members = memberMetadataArray.map { memberMetadataObj =>

val memberMetadata = memberMetadataObj.asInstanceOf[Struct]

val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]

val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]

val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]

val subscription: PartitionAssignor.Subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])

val assignment: PartitionAssignor.Assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])

val member =  new MemberMetadata(memberId

, groupId

, clientId

, clientHost

, protocolType

, List((protocol, subscription.topics().asScala.toSet))

, assignment.partitions().asScala.map(tp => (tp.topic(), tp.partition())).toSet)

member

}

val finalProtocolType =  if (protocolType ==  null || protocolType.isEmpty) None  else Some(protocolType)

val group =  new GroupMetadata(groupId = groupId

, generationId = generationId

, protocolType = finalProtocolType

, protocol = Option(protocol)

, leaderId = Option(leaderId)

)

members.foreach(group.add)

group

else {

throw new IllegalStateException("Unknown group metadata message version")

}

}

}

//

Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.

//  (specify --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
class OffsetsMessageFormatter  extends MessageFormatter {

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {

Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {

//

Only print if the message is an offset record.

//  We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>

val groupTopicPartition = offsetKey.key

val value = consumerRecord.value

val formattedValue =

if (value ==  null ) "NULL"

else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString

output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))

output.write("::".getBytes(StandardCharsets.UTF_8))

output.write(formattedValue.getBytes(StandardCharsets.UTF_8))

output.write("\n".getBytes(StandardCharsets.UTF_8))

case _ =>  //  no-op
}

}

}

//  Formatter for use with tools to read group metadata history
class GroupMetadataMessageFormatter  extends MessageFormatter {

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {

Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {

//

Only print if the message is a group metadata record.

//  We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case groupMetadataKey: GroupMetadataKey =>

val groupId = groupMetadataKey.key

val value = consumerRecord.value

val formattedValue =

if (value ==  null ) "NULL"

else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString

output.write(groupId.getBytes(StandardCharsets.UTF_8))

output.write("::".getBytes(StandardCharsets.UTF_8))

output.write(formattedValue.getBytes(StandardCharsets.UTF_8))

output.write("\n".getBytes(StandardCharsets.UTF_8))

case _ =>  //  no-op
}

}

}

}

case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {

def  this (group: String, topic: String, partition: Int) =

this (group,  new TopicPartition(topic, partition))

override def toString: String =

"[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)

}

trait BaseKey{

def version: Short

def key: Any

}

case class OffsetKey(version: Short, key: GroupTopicPartition)  extends BaseKey {

override def toString: String = key.toString

}

case class GroupMetadataKey(version: Short, key: String)  extends BaseKey {

override def toString: String = key

}


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

硅谷创投课

硅谷创投课

[美]加里·维纳查克 / 林怡 / 北京联合出版社 / 2017-6 / 52

☆通用电气前CEO杰克·韦尔奇力荐,影响500强企业CMO的美国互联网意见领袖全新力作! ☆《纽约时报》榜单全新畅销书,把握来自硅谷的互联网风口浪潮! ☆70后创投鬼才,影响美国00后一代商业观的网络红人、科技公司天使投资人面对面解答你创投、管理、运营中的 一切困惑! ☆来自无数实战的真实商业意见!年轻人为什么买你的账?投资人凭什么会把钱交给你?企业家更应该做的事到底是什么?告诉......一起来看看 《硅谷创投课》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

SHA 加密
SHA 加密

SHA 加密工具