内容简介:本文是David Romero一篇Spring + Kafka Stream实现CQRS的案例代码:去年九月,我的同事伊万·古铁雷斯和我谈到我们cowokers如何实现事件与Kafka Stream,我开发了一个Kafka Stream,它读取包含来自Twitter的“Java”字样的推文,按用户名分组推文,并选择最喜欢的推文。管道结束端将重新选择的信息再发送到PostgreSQL由于我们收到了积极的反馈,并且我们学到了很多东西,所以我想分享这个演示,以便任何想要看一眼的人都可以使用它。
本文是David Romero一篇Spring + Kafka Stream实现CQRS的案例代码:
去年九月,我的同事伊万·古铁雷斯和我谈到我们cowokers如何实现事件与Kafka Stream,我开发了一个Kafka Stream,它读取包含来自Twitter的“Java”字样的推文,按用户名分组推文,并选择最喜欢的推文。管道结束端将重新选择的信息再发送到PostgreSQL
由于我们收到了积极的反馈,并且我们学到了很多东西,所以我想分享这个演示,以便任何想要看一眼的人都可以使用它。
2.实施
该演示是基于Kafka和Kafka Streams 的CQRS模式的实现。Kafka能够解耦read(Query)和write(Command)操作,这有助于我们更快地开发事件源应用。
堆栈
整个堆栈已在 Docker 中实现,因为它集成了多个 工具 及其隔离级别时的简单性。堆栈由
version: '3.1'
services:
#############
# Kafka
#############
zookeeper:
image: confluentinc/cp-zookeeper
container_name: zookeeper
network_mode: host
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka
container_name: kafka
network_mode: host
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # We have only 1 broker, so offsets topic can only have one replication factor.
connect:
image: confluentinc/cp-kafka-connect
container_name: kafka-connect
network_mode: host
ports:
- "8083:8083"
depends_on:
- zookeeper
- kafka
volumes:
- $PWD/connect-plugins:/etc/kafka-connect/jars # in this volume is located the postgre driver.
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083 # Kafka connect creates an endpoint in order to add connectors
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_GROUP_ID: kafka-connect
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-config
CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 # We have only 1 broker, so we can only have 1 replication factor.
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" # We receive a string as key and a json as value
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
#############
# PostgreSQL
#############
db:
container_name: postgresql
network_mode: host
image: postgres
restart: always
ports:
- "5432:5432"
environment:
POSTGRES_DB: influencers
POSTGRES_USER: user
POSTGRES_PASSWORD: 1234
上面配置在 docker-compose 文件中,也包含此演示中涉及的所有工具:
1. Zookeper:卡夫卡不可分割的合作伙伴。
2. Kafka:主要角色。你需要设置zookeeper ip。
3. Kafka Connector:4个主要的Kafka核心API之一。它负责读取所提供topic的记录并将其插入PostgreSQL。
4. PostgreSQL:SQL数据库。
生产者
这是写入数据到Kafka的应用程序。我们的基础设施负责阅读Twitter中包含“Java”字样的推文并将其发送给Kafka。
以下代码包含两个部分:Twitter Stream和Kafka Producer。
Twitter Stream:创建推文数据流。如果要在消费之前过滤流,也可以添加FilterQuery。您需要凭据才能访问Twitter API。
Kafka Producer:它将记录发送给kafka。在我们的演示中,它将没有key的记录发送到“推文”主题。
@SpringBootApplication
@Slf4j
public class DemoTwitterKafkaProducerApplication {
public static void main(String[] args) {
// Kafka config
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //Kafka cluster hosts.
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "demo-twitter-kafka-application-producer"); // Group id
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);
// Twitter Stream
final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
final StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
final long likes = getLikes(status);
final String tweet = status.getText();
final String content = status.getUser().getName() + "::::" + tweet + "::::" + likes;
log.info(content);
producer.send(new ProducerRecord<>("tweets", content));
}
//Some methods have been omitted for simplicity.
private long getLikes(Status status) {
return status.getRetweetedStatus() != null ? status.getRetweetedStatus().getFavoriteCount() : 0; // Likes can be null.
}
};
twitterStream.addListener(listener);
final FilterQuery tweetFilterQuery = new FilterQuery();
tweetFilterQuery.track(new String[] { "Java" });
twitterStream.filter(tweetFilterQuery);
SpringApplication.run(DemoTwitterKafkaProducerApplication.class, args);
Runtime.getRuntime().addShutdownHook(new Thread(() -> producer.close())); //Kafka producer should close when application finishes.
}
}
这个应用程序是一个Spring Boot应用程序。
Kafka Stream
我们基础设施的主要部分负责从“tweets”主题topic中阅读推文,按用户名分组,计算推文,提取最喜欢的推文并将其发送给“influencers”的新主题。
让我们关注下一个代码块的两个最重要的方法:
1. 流方法:Kafka Stream Java API遵循与 Java 8 Stream API相同的命名法。在管道中执行的第一个操作是选择密钥,因为每次密钥改变时,在主题中执行重新分区操作。所以,我们应该尽可能少地改变密钥。然后,我们必须计算大多数人喜欢积累的推文。并且由于此操作是有状态操作,我们需要执行聚合。聚合操作将在以下项目中详述。最后,我们需要将记录发送到名为“influencers”的输出主题。对于此任务,我们需要将Influencer类映射到InfluencerJsonSchema类,然后使用to方法。InfluencerJsonSchema类将在Kafka Connector部分中解释。Peek方法用于调试目的。
2.aggregateInfoToInfluencer方法:这是一个有状态操作。收到三个参数:用户名,主题的原始推文和之前存储的Influencer。在推文计数器中添加一个,并将喜欢与喜欢的推文进行比较。返回Influecer类的新实例,以保持不变性。
@Configuration
@EnableKafkaStreams
static class KafkaConsumerConfiguration {
final Serde<Influencer> jsonSerde = new JsonSerde<>(Influencer.class);
final Materialized<String, Influencer, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<String, Influencer, KeyValueStore<Bytes, byte[]>>as("aggregation-tweets-by-likes").withValueSerde(jsonSerde);
@Bean
KStream<String, String> stream(StreamsBuilder streamBuilder){
final KStream<String, String> stream = streamBuilder.stream("tweets");
stream
.selectKey(( key , value ) -> String.valueOf(value.split("::::")[0]))
.groupByKey()
.aggregate(Influencer::init, this::aggregateInfoToInfluencer, materialized)
.mapValues(InfluencerJsonSchema::new)
.toStream()
.peek( (username, jsonSchema) -> log.info("Sending a new tweet from user: {}", username))
.to("influencers", Produced.with(Serdes.String(), new JsonSerde<>(InfluencerJsonSchema.class)));
return stream;
}
private Influencer aggregateInfoToInfluencer(String username, String tweet, Influencer influencer) {
final long likes = Long.valueOf(tweet.split("::::")[2]);
if ( likes >= influencer.getLikes() ) {
return new Influencer(influencer.getTweets()+1, username, String.valueOf(tweet.split("::::")[1]), likes);
} else {
return new Influencer(influencer.getTweets()+1, username, influencer.getContent(), influencer.getLikes());
}
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs(KafkaProperties kafkaProperties) {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "demo-twitter-kafka-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new StreamsConfig(props);
}
}
@EnableKafkaStreams注释和kStreamsConfigs方法负责将Kafka Stream API与Spring Framework集成。
在上面的代码块中提到了Influencer 类,为了便于阅读,这里提供了影响者类的代码:
@RequiredArgsConstructor
@Getter
public static class Influencer {
final long tweets;
final String username;
final String content;
final long likes;
static Influencer init() {
return new Influencer(0, "","", 0);
}
@JsonCreator
static Influencer fromJson(@JsonProperty("tweets") long tweetCounts, @JsonProperty("username") String username, @JsonProperty("content") String content, @JsonProperty("likes") long likes) {
return new Influencer(tweetCounts, username, content, likes);
}
}
fromJson由于Kafka Stream使用了其序列化方法,该方法是Kafka强制性要求的。如果您想了解有关此主题的更多信息,请参阅Kafka Stream Serde。
这个应用程序是一个Spring Boot应用程序。
Kafka Connector
一旦我们将数据喂给了我们的新主题“influencers”,我们就必须将数据保存到Postgre。对于此任务,Kafka提供了一个名为Kafka Connect的强大API 。由Apache Kafka开发人员创建的Confluent公司为许多第三方工具开发了多个连接器。对于JDBC,推出两个连接器:source和sink。
1. source源连接器是从jdbc驱动程序读取数据并将数据发送到Kafka。
2. sink接收器连接器从Kafka读取数据并将其发送到jdbc驱动程序。
我们将使用JDBC Sink连接器,此连接器需要schema信息才能将主题记录映射到 sql 记录。在我们的演示中,schema 在主题的记录中提供。因此,我们必须在数据管道中将Influecer类映射到InfluencerJsonSchema类。
在以下代码中,您可以看到schema 的发送方式。
/** * https://gist.github.com/rmoff/2b922fd1f9baf3ba1d66b98e9dd7b364 * */
@Getter
public class InfluencerJsonSchema {
Schema schema;
Influencer payload;
InfluencerJsonSchema(long tweetCounts, String username, String content, long likes) {
this.payload = new Influencer(tweetCounts, username, content, likes);
Field fieldTweetCounts = Field.builder().field("tweets").type("int64").build();
Field fieldContent = Field.builder().field("content").type("string").build();
Field fieldUsername = Field.builder().field("username").type("string").build();
Field fieldLikes = Field.builder().field("likes").type("int64").build();
this.schema = new Schema("struct", Arrays.asList(fieldUsername,fieldContent,fieldLikes,fieldTweetCounts));
}
public InfluencerJsonSchema(Influencer influencer) {
this(influencer.getTweets(),influencer.getUsername(),influencer.getContent(),influencer.getLikes());
}
@Getter
@AllArgsConstructor
static class Schema {
String type;
List<Field> fields;
}
@Getter
@Builder
static class Field {
String type;
String field;
}
}
然后,我们需要配置我们的Kafka连接器。应提供源主题,目标表,主键或URL连接。特别提到'insert.mode'字段。我们使用'upsert'模式,因为主键是用户名,因此将根据用户之前是否已经持久来插入或更新记录。
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "influencers",
"table.name.format": "influencer",
"connection.url": "jdbc:postgresql://postgresql:5432/influencers?user=user&password=1234",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "username",
"pk.mode": "record_key"
}
}
上面的json代码已存储到一个文件中,以便对其进行跟进
一旦我们开发了连接器,我们就必须将连接器添加到我们的Kafka Connector容器中,这可以通过简单的curl命令来执行。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connect-plugins/jdbc-sink.json
消费阅读者
我们开发了一个简单的Spring Boot应用程序来读取Postgre中插入的记录。这个应用程序非常简单,代码将被跳过这篇文章,因为它没关系。
如果你想看到代码,可以在 Github中找到
3.如何运行?
如果要运行演示,则必须执行以下命令。
1. docker-compose up
2. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connect-plugins/jdbc-sink.json
3. mvn clean spring-boot:run -pl producer
4. mvn clean spring-boot:run -pl consumer
5. mvn clean spring-boot:run -pl reader
结论
这个演示向我们展示了CQRS实现的一个很好的例子,以及使用Kafka实现这种模式是多么容易。
在我看来,Kafka Stream是Kafka最强大的API,因为它提供了一个简单的API,具有很棒的功能,可以从所有必要的实现中抽象出来,消耗来自Kafka的记录,并允许您专注于开发用于管理大数据流的强大管道。
此外,Spring Framework提供了额外的抽象层,允许我们将Kafka与Spring Boot应用程序集成。
提供了本文的完整源代码
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 使用Akka实现并发
- 使用GOLANG实现猴子排序
- 使用 WebSocket 实现 JsBridge
- 使用 RabbitMQ 实现 RPC
- 使用Kafka实现事件溯源
- 使用 Swift 实现 Promise
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Just My Type
Simon Garfield / Profile Books / 2010-10-21 / GBP 14.99
What's your type? Suddenly everyone's obsessed with fonts. Whether you're enraged by Ikea's Verdanagate, want to know what the Beach Boys have in common with easy Jet or why it's okay to like Comic Sa......一起来看看 《Just My Type》 这本书的介绍吧!