使用Spring Boot和Kafka Streams实现CQRS

栏目: Java · 发布时间: 6年前

内容简介:本文是David Romero一篇Spring + Kafka Stream实现CQRS的案例代码:去年九月,我的同事伊万·古铁雷斯和我谈到我们cowokers如何实现事件与Kafka Stream,我开发了一个Kafka Stream,它读取包含来自Twitter的“Java”字样的推文,按用户名分组推文,并选择最喜欢的推文。管道结束端将重新选择的信息再发送到PostgreSQL由于我们收到了积极的反馈,并且我们学到了很多东西,所以我想分享这个演示,以便任何想要看一眼的人都可以使用它。

本文是David Romero一篇Spring + Kafka Stream实现CQRS的案例代码:

去年九月,我的同事伊万·古铁雷斯和我谈到我们cowokers如何实现事件与Kafka Stream,我开发了一个Kafka Stream,它读取包含来自Twitter的“Java”字样的推文,按用户名分组推文,并选择最喜欢的推文。管道结束端将重新选择的信息再发送到PostgreSQL

由于我们收到了积极的反馈,并且我们学到了很多东西,所以我想分享这个演示,以便任何想要看一眼的人都可以使用它。

2.实施

该演示是基于Kafka和Kafka Streams 的CQRS模式的实现。Kafka能够解耦read(Query)和write(Command)操作,这有助于我们更快地开发事件源应用。

堆栈

整个堆栈已在 Docker 中实现,因为它集成了多个 工具 及其隔离级别时的简单性。堆栈由

version: '3.1'
services:

#############
# Kafka
#############
zookeeper:
image: confluentinc/cp-zookeeper
container_name: zookeeper
network_mode: host
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka
container_name: kafka
network_mode: host
depends_on:
- zookeeper
ports:
-
"9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT:
//localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # We have only 1 broker, so offsets topic can only have one replication factor.

connect:
image: confluentinc/cp-kafka-connect
container_name: kafka-connect
network_mode: host
ports:
-
"8083:8083"
depends_on:
- zookeeper
- kafka
volumes:
- $PWD/connect-plugins:/etc/kafka-connect/jars # in this volume is located the postgre driver.
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083 # Kafka connect creates an endpoint in order to add connectors
CONNECT_REST_ADVERTISED_HOST_NAME:
"kafka-connect"
CONNECT_GROUP_ID: kafka-connect
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-config
CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 # We have only 1 broker, so we can only have 1 replication factor.
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER:
"org.apache.kafka.connect.storage.StringConverter" # We receive a string as key and a json as value
CONNECT_VALUE_CONVERTER:
"org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER:
"org.apache.kafka.connect.storage.StringConverter"
CONNECT_INTERNAL_VALUE_CONVERTER:
"org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars

#############
# PostgreSQL
#############
db:
container_name: postgresql
network_mode: host
image: postgres
restart: always
ports:
-
"5432:5432"
environment:
POSTGRES_DB: influencers
POSTGRES_USER: user
POSTGRES_PASSWORD: 1234

上面配置在 docker-compose 文件中,也包含此演示中涉及的所有工具:

1. Zookeper:卡夫卡不可分割的合作伙伴。

2. Kafka:主要角色。你需要设置zookeeper ip。

3. Kafka Connector:4个主要的Kafka核心API之一。它负责读取所提供topic的记录并将其插入PostgreSQL。

4. PostgreSQL:SQL数据库。

生产者

这是写入数据到Kafka的应用程序。我们的基础设施负责阅读Twitter中包含“Java”字样的推文并将其发送给Kafka。

以下代码包含两个部分:Twitter Stream和Kafka Producer。

Twitter Stream:创建推文数据流。如果要在消费之前过滤流,也可以添加FilterQuery。您需要凭据才能访问Twitter API。

Kafka Producer:它将记录发送给kafka。在我们的演示中,它将没有key的记录发送到“推文”主题。

@SpringBootApplication
@Slf4j
public class DemoTwitterKafkaProducerApplication {

public static void main(String[] args) {

// Kafka config
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092"); //Kafka cluster hosts.
properties.put(ProducerConfig.CLIENT_ID_CONFIG,
"demo-twitter-kafka-application-producer"); // Group id
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);

// Twitter Stream
final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
final StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
final long likes = getLikes(status);
final String tweet = status.getText();
final String content = status.getUser().getName() +
"::::" + tweet + "::::" + likes;
log.info(content);
producer.send(new ProducerRecord<>(
"tweets", content));
}

//Some methods have been omitted for simplicity.

private long getLikes(Status status) {
return status.getRetweetedStatus() != null ? status.getRetweetedStatus().getFavoriteCount() : 0;
// Likes can be null.
}
};
twitterStream.addListener(listener);
final FilterQuery tweetFilterQuery = new FilterQuery();
tweetFilterQuery.track(new String[] {
"Java" });
twitterStream.filter(tweetFilterQuery);
SpringApplication.run(DemoTwitterKafkaProducerApplication.class, args);
Runtime.getRuntime().addShutdownHook(new Thread(() -> producer.close()));
//Kafka producer should close when application finishes.
}
}

这个应用程序是一个Spring Boot应用程序。

Kafka Stream

我们基础设施的主要部分负责从“tweets”主题topic中阅读推文,按用户名分组,计算推文,提取最喜欢的推文并将其发送给“influencers”的新主题。

让我们关注下一个代码块的两个最重要的方法:

1. 流方法:Kafka Stream Java API遵循与 Java 8 Stream API相同的命名法。在管道中执行的第一个操作是选择密钥,因为每次密钥改变时,在主题中执行重新分区操作。所以,我们应该尽可能少地改变密钥。然后,我们必须计算大多数人喜欢积累的推文。并且由于此操作是有状态操作,我们需要执行聚合。聚合操作将在以下项目中详述。最后,我们需要将记录发送到名为“influencers”的输出主题。对于此任务,我们需要将Influencer类映射到InfluencerJsonSchema类,然后使用to方法。InfluencerJsonSchema类将在Kafka Connector部分中解释。Peek方法用于调试目的。

2.aggregateInfoToInfluencer方法:这是一个有状态操作。收到三个参数:用户名,主题的原始推文和之前存储的Influencer。在推文计数器中添加一个,并将喜欢与喜欢的推文进行比较。返回Influecer类的新实例,以保持不变性。

@Configuration
@EnableKafkaStreams
static class KafkaConsumerConfiguration {

final Serde<Influencer> jsonSerde = new JsonSerde<>(Influencer.class);
final Materialized<String, Influencer, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<String, Influencer, KeyValueStore<Bytes, byte[]>>as("aggregation-tweets-by-likes").withValueSerde(jsonSerde);

@Bean
KStream<String, String> stream(StreamsBuilder streamBuilder){
final KStream<String, String> stream = streamBuilder.stream(
"tweets");
stream
.selectKey(( key , value ) -> String.valueOf(value.split(
"::::")[0]))
.groupByKey()
.aggregate(Influencer::init, this::aggregateInfoToInfluencer, materialized)
.mapValues(InfluencerJsonSchema::new)
.toStream()
.peek( (username, jsonSchema) -> log.info(
"Sending a new tweet from user: {}", username))
.to(
"influencers", Produced.with(Serdes.String(), new JsonSerde<>(InfluencerJsonSchema.class)));
return stream;
}

private Influencer aggregateInfoToInfluencer(String username, String tweet, Influencer influencer) {
final long likes = Long.valueOf(tweet.split(
"::::")[2]);
if ( likes >= influencer.getLikes() ) {
return new Influencer(influencer.getTweets()+1, username, String.valueOf(tweet.split(
"::::")[1]), likes);
} else {
return new Influencer(influencer.getTweets()+1, username, influencer.getContent(), influencer.getLikes());
}
}

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs(KafkaProperties kafkaProperties) {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"demo-twitter-kafka-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new StreamsConfig(props);
}

}

@EnableKafkaStreams注释和kStreamsConfigs方法负责将Kafka Stream API与Spring Framework集成。

在上面的代码块中提到了Influencer 类,为了便于阅读,这里提供了影响者类的代码:

@RequiredArgsConstructor
@Getter
public static class Influencer {

final long tweets;

final String username;

final String content;

final long likes;

static Influencer init() {
return new Influencer(0, "","", 0);
}

@JsonCreator
static Influencer fromJson(@JsonProperty(
"tweets") long tweetCounts, @JsonProperty("username") String username, @JsonProperty("content") String content, @JsonProperty("likes") long likes) {
return new Influencer(tweetCounts, username, content, likes);
}

}

fromJson由于Kafka Stream使用了其序列化方法,该方法是Kafka强制性要求的。如果您想了解有关此主题的更多信息,请参阅Kafka Stream Serde。

这个应用程序是一个Spring Boot应用程序。

Kafka Connector

一旦我们将数据喂给了我们的新主题“influencers”,我们就必须将数据保存到Postgre。对于此任务,Kafka提供了一个名为Kafka Connect的强大API 。由Apache Kafka开发人员创建的Confluent公司为许多第三方工具开发了多个连接器。对于JDBC,推出两个连接器:source和sink。

1. source源连接器是从jdbc驱动程序读取数据并将数据发送到Kafka。

2. sink接收器连接器从Kafka读取数据并将其发送到jdbc驱动程序。

我们将使用JDBC Sink连接器,此连接器需要schema信息才能将主题记录映射到 sql 记录。在我们的演示中,schema 在主题的记录中提供。因此,我们必须在数据管道中将Influecer类映射到InfluencerJsonSchema类。

在以下代码中,您可以看到schema 的发送方式。

/** * https://gist.github.com/rmoff/2b922fd1f9baf3ba1d66b98e9dd7b364 *  */
@Getter
public class InfluencerJsonSchema {

Schema schema;
Influencer payload;

InfluencerJsonSchema(long tweetCounts, String username, String content, long likes) {
this.payload = new Influencer(tweetCounts, username, content, likes);
Field fieldTweetCounts = Field.builder().field(
"tweets").type("int64").build();
Field fieldContent = Field.builder().field(
"content").type("string").build();
Field fieldUsername = Field.builder().field(
"username").type("string").build();
Field fieldLikes = Field.builder().field(
"likes").type("int64").build();
this.schema = new Schema(
"struct", Arrays.asList(fieldUsername,fieldContent,fieldLikes,fieldTweetCounts));
}

public InfluencerJsonSchema(Influencer influencer) {
this(influencer.getTweets(),influencer.getUsername(),influencer.getContent(),influencer.getLikes());
}

@Getter
@AllArgsConstructor
static class Schema {

String type;
List<Field> fields;

}

@Getter
@Builder
static class Field {

String type;
String field;

}
}

然后,我们需要配置我们的Kafka连接器。应提供源主题,目标表,主键或URL连接。特别提到'insert.mode'字段。我们使用'upsert'模式,因为主键是用户名,因此将根据用户之前是否已经持久来插入或更新记录。

{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "influencers",
"table.name.format": "influencer",
"connection.url": "jdbc:postgresql://postgresql:5432/influencers?user=user&password=1234",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "username",
"pk.mode": "record_key"
}
}

上面的json代码已存储到一个文件中,以便对其进行跟进

一旦我们开发了连接器,我们就必须将连接器添加到我们的Kafka Connector容器中,这可以通过简单的curl命令来执行。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @connect-plugins/jdbc-sink.json

消费阅读者

我们开发了一个简单的Spring Boot应用程序来读取Postgre中插入的记录。这个应用程序非常简单,代码将被跳过这篇文章,因为它没关系。

如果你想看到代码,可以在 Github中找到

3.如何运行?

如果要运行演示,则必须执行以下命令。

1. docker-compose up

2.  curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @connect-plugins/jdbc-sink.json

3. mvn clean spring-boot:run -pl producer

4. mvn clean spring-boot:run -pl consumer

5. mvn clean spring-boot:run -pl reader

结论

这个演示向我们展示了CQRS实现的一个很好的例子,以及使用Kafka实现这种模式是多么容易。

在我看来,Kafka Stream是Kafka最强大的API,因为它提供了一个简单的API,具有很棒的功能,可以从所有必要的实现中抽象出来,消耗来自Kafka的记录,并允许您专注于开发用于管理大数据流的强大管道。

此外,Spring Framework提供了额外的抽象层,允许我们将Kafka与Spring Boot应用程序集成。

提供了本文的完整源代码


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

你必须知道的213个C语言问题

你必须知道的213个C语言问题

范立锋、李世欣 / 人民邮电出版社 / 2010-6 / 45.00元

《你必须知道的213个C语言问题》精选了213个在C语言程序设计中经常遇到的问题,目的是帮助读者解决在C语言学习和开发中遇到的实际困难,提高读者学习和开发的效率。这些问题涵盖了C语言与软件开发、C语言基础、编译预处理、字符串、函数、键盘操作、文件、目录和磁盘、数组、指针和结构、DOS服务和BIOS服务、日期和时间、重定向I/O和进程命令、C语言开发常见错误及程序调试等内容,均是作者经过充分的调研,......一起来看看 《你必须知道的213个C语言问题》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具