@levinzhang
2022-08-20T09:36:09.000000Z
字数 14965
阅读 428
by
消费Kafka消息是很容易的,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(过滤、连接或操作事件)的话,仅仅使用Kafka的消费API可能就不是最好的方式了,因为这将导致代码变得非常复杂。在实时处理Kafka事件方面,Kafka Streams和Quarkus是绝配。
在本系列的第一部分中,我们学习了Apache Kafka和Quarkus的集成,并开发了一个简单的应用,从两个Kafka主题生产和消费事件。
在那个样例中,我们模拟了一个影视流公司,在一个Kafka主题中存储电影信息,在另外一个主题中存储了用户停止观看电影时所发生的每个事件,并捕获了电影已播放的时间。
下图展示了该应用的架构:
我们可以看到,消费消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)或者我们需要在事件之间做一些关联,单纯使用Kafka的消费API可能就不是最佳的方式了,因为这会导致代码非常复杂。
Kafka Streams项目能够帮助我们在事件产生时实时消费它们,应用各种转换,执行流连接等,并且可以选择性地将新的数据表述写回主题中。
对于有状态和无状态的流应用来说,Kafka Streams都是理想方案,它能够实现基于时间的操作(例如,围绕给定的时间段对事件进行分组),并且考虑到了Kafka生态系统中普遍存在的可扩展性、可靠性和可维护性。
Kafka Stream由三个元素组成,即输入(源处理器)、输出(sink处理器)和处理器(流处理器)。
源处理器(Source processor):源处理器代表一个Kafka主题。源处理器会发送事件到一个或多个流处理器中。
流处理器(Stream processor):流处理器会将转换/逻辑应用于输入流中,比如连接、分组、计数、映射等。流处理器可以连接至另一个流处理器和/或sink处理器。
Sink处理器(Sink processor):Sink处理器代表了输出的数据,它会连接至一个Kafka主题。
拓扑结构(topology)是由源、处理器和sink组成的无循环图,事件会传入到一个Kafka Streams中,该实例将开始拓扑结构的执行。
Quarkus使用Quarkus KStreams扩展实现与Kafka Streams集成。
使用Quarkus最快捷的方式是通过初始化页面添加所需的依赖。每个服务可能需要不同的依赖,你可以选择Java 11或Java 17。为了实现Quarkus与Kafka Streams的集成,我们至少需要添加Kafka Streams扩展。
正如在本文开始时所提到的,在本系列的第一部分中,我们开发了一个影视流公司,它有两个Kafka主题,其中一个用来存储电影的列表,另外一个主题会在用户停止播放电影时存储用户所在的区域(事件的键),并且会以电影id和播放时间作为事件的值。
所有的这些逻辑都是在名为Movie Plays Producer的生成者服务中创建的,该服务是使用Quarkus开发的。
除此之外,我还使用Quakus开发了一个Movie Plays Consumer服务,它会消费这两个主题的事件并且会在控制台上展示它们(并实现了HTTP服务器端事件)。
但是,这里没有对数据进行任何处理,它只是按照原样进行了接收。如果我们想要在movies和playtimemovies主题之间进行一下连接,在获取电影播放时长的时候得到电影的详细信息而不是id的话,那又该怎么办呢?
如果仅仅使用Kafka消息来实现这样的逻辑会变成一项很复杂的任务,因为我们需要在一个Map中存储Movie信息,并且在每个playedmovie事件发生时,进行匹配处理。
与其为每个用例手工编写代码,不如看一下如何使用Kafka Streams,以及它是如何与Quarkus集成来解决这个问题的。
导航至Quarkus的初始化页面,并选择Apache Kafka Streams扩展来实现与Kafka Streams的集成。然后,选择RestEasy和RestEasy Jackson扩展实现事件从Java对象和JSON之间的编排/解排。同时,取消选中Started Code生成选项。
请参照下面的截图:
你也可以跳过这个手动的步骤并导航至Kafka Stream Quarkus Generator链接,在这里,所有的依赖都已经选择好了。然后,点击Generate your application按钮,以下载应用骨架的压缩文件。
解压文件,并在你最喜欢的IDE中打开项目。
当开发Kafka Stream应用时,我们需要做的第一件事就是创建Topology实例,并定义源、处理器和sink。
在Quarkus中,我们只需要创建一个CDI类,这个类需要包含一个返回Topology
实例的方法。
创建名为TopologyProducer
的类,它将会实现从这两个主题消费事件并连接它们的逻辑。最后,生成的结果将会发送至一个sink处理器,该处理器以控制台输出的形式展示结果。
还有一个元素我们没有提到,在这些场景中它非常有用,那就是Kafka Tables。
一个主题可以包含具有相同键的多个事件。例如,我们可以使用某个键插入一个电影,然后我们可以使用相同的键创建一个新的事件来对电影进行更新:
但是,如果我们想要让movies主题与playtimemovies主题进行连接的话,那我们该选择使用哪个值为1的事件呢?第一个还是第二个?在这个具体的情况中,应该选择最新的那一个,因为它包含了电影的最新版本。为了获取每个事件的最新版本,Kafka Streams有一个表的概念(KTable/GlobalKTable)。
Kafka Streams会浏览指定的主题,获取每个事件的最新版本,并将其放到一个表实例中。
KafkaStream扩展并不会像Kafka Messaging集成那样自动注册SerDes,所以我们需要在拓扑中手动注册它们。
package org.acme;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
@ApplicationScoped
public class TopologyProducer {
private static final String MOVIES_TOPIC = "movies";
private static final String PLAY_MOVIES_TOPIC = "playtimemovies";
@Produces
public Topology getTopCharts() {
final StreamsBuilder builder = new StreamsBuilder();
// 用于Movie和PlayedMovie的SerDes
final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class);
final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class);
// 为Movies主题创建一个Global Kafka Table
final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable(
MOVIES_TOPIC,
Consumed.with(Serdes.Integer(), movieSerder));
// 连接至playtimemovies主题的流,每当该主题有事件生成都会被该流所消费
final KStream<String, MoviePlayed> playEvents = builder.stream(
PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));
// PlayedMovies使用区域作为键,对象作为值。我们对内容进行map操作,让电影的id作为key(以便于进行连接)并让对象继续作为值
// 另外,我们使用movies表的键(movieId)以及流的键(在前面的map方法中,我们也将其变成了movieId)进行连接
// 最后,结果会流向控制台
playEvents
.map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field
.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
.print(Printed.toSysOut());
return builder.build();
}
}
Movie
和MoviePlayed
POJO包含了实现逻辑所需的属性:
Movie
对象如下所示:
package org.acme;
public class Movie {
public int id;
public String name;
public String director;
public String genre;
public Movie(int id, String name, String director, String genre) {
this.id = id;
this.name = name;
this.director = director;
this.genre = genre;
}
}
MoviePlayed
对象如下所示:
package org.acme;
public class MoviePlayed {
public int id;
public long duration;
public MoviePlayed(int id, long duration) {
this.id = id;
this.duration = duration;
}
}
运行Kafka Stream应用之前的最后一步是配置参数,其中最重要的是quarkus.kafka-streams.topics
。它是一个主题列表,在拓扑结构开始处理数据之前,它们就要存在于Kafka集群中,这是一个前提条件。
打开src/main/resources/application.properties
文件并添加如下的代码行:
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG
quarkus.kafka-streams.topics=playtimemovies,movies
现在,我们可以测试一下流了。我们启动在上一篇文章中开发的生产者。生产者的源码可以在这里找到。
Quarkus KStreams集成了Quarkus DevServices。所以,我们不需要启动Kafka集群,也不需要配置它的位置,因为Quarkus Dev模式会处理好所有的事情。我们只需要记住在自己的计算机上要有一个运行中的容器环境即可,比如Podman或其他兼容OCI的工具。
在终端窗口中启动生产者服务:
cd movie-plays-producer
./mvnw compile quarkus:dev
2022-04-11 07:49:31,900 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes
2022-04-11 07:49:31,941 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.
2022-04-11 07:49:31,942 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-04-11 07:49:31,943 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2022-04-11 07:49:32,399 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes
2022-04-11 07:49:32,899 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes
2022-04-11 07:49:33,404 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes
2022-04-11 07:49:33,902 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes
2022-04-11 07:49:34,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes
2022-04-11 07:49:34,903 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes
2022-04-11 07:49:35,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes
在另一个终端窗口中,启动我们刚刚开发的Kafka Stream代码:
./mvnw compile quarkus:dev
2022-04-11 07:54:59,321 INFO [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run
2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]
2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-04-11 07:54:59,324 INFO [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
[KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]
在输出中打印的是事件(连接所生成的结果),其中键是movieId
,值是movie
本身。我们现在所具有的功能是,每当一个电影停止播放时,Kafka Stream会对其进行处理并以Movie全量信息的形式对其进行展现。
到目前为止,还不算复杂,对于这样的场景,你可能会想我们根本没有必要使用Kafka Streams。但是,我们再加一些需求,这样你就能看到它的强大之处了。
现在,我们不是在用户每次停掉电影的时候都生成事件,而是只对用户观看时间超过10分钟的电影发送事件。
我们可以使用filter
方法按照持续时长进行过滤。
playEvents
.filter((region, event) -> event.duration >= 10) // filters by duration
.map((key, value) -> KeyValue.pair(value.id, value))
.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
.print(Printed.toSysOut());
重启应用,我们可以发现观看时长小于10分钟的电影将不会被处理。
我们发现,Kafka Streams有助于代码的整洁性,接下来,我们添加最后的需求。现在,我们对每部电影的播放时长并不感兴趣,而是对每部电影有多少次超过10分钟的播放感兴趣。
到目前为止,对事件的处理都是无状态的,因为事件都是遵循这样的步骤,即接收、处理并发送至sink处理器(也就是发送至一个主题或控制台输出),但是,为了统计某部电影播放的次数,我们需要在内存记住电影被播放了多少次,并且当任意用户再次观看超过10分钟的时候,要对这个统计数字递增一次。此时,事件的处理就要以有状态的方式进行了。
我们需要做的第一件事就是创建一个Java类,以存储电影的名称及其播放的次数。
public class MoviePlayCount {
public String name;
public int count;
public MoviePlayCount aggregate(String name) {
this.name = name;
this.count++;
return this;
}
@Override
public String toString() {
return "MoviePlayCount [count=" + count + ", name=" + name + "]";
}
}
这是一个计数器类,它依然需要两样东西:
aggregate
方法的逻辑。关于第一个问题,我们需要使用KeyValueBytesStoreSupplier接口。
public static final String COUNT_MOVIE_STORE = "countMovieStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);
对于第二个问题,Kafka Streams有一个用来聚合结果的aggregate
方法。
在我们的用例中,也就是每部电影播放时长超过10分钟的次数。
// MoviePlayCount可以被序列化和反序列化
ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);
// 这是之前的连接操作,其中键是电影id,值是电影
.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
// 根据键对事件进行分组,在本例中,也就是电影的id
.groupByKey(Grouped.with(Serdes.Integer(), movieSerder))
// 聚合方法,如果MoviePlayCount已经创建的话,获取该对象(如果尚未创建的话,会创建该实例)并调用其aggregate方法,以对观看次数进行递增
.aggregate(MoviePlayCount::new,
(movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name),
Materialized.<Integer, MoviePlayCount> as(storeSupplier)
.withKeySerde(Serdes.Integer())
.withValueSerde(moviePlayCountSerder)
)
重启应用,在控制台上将会展示电影播放的次数。
提示:要重启应用,只需在终端输入“s”,应用将会自动重启。
应用重启之后,控制台将会展示每部电影的状态。
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto]
[KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]
交互式查询
因为我们将sink处理器设置成了System.out
流,所以聚合结果会流向控制台。
.toStream()
.print(Printed.toSysOut());
但是,我们也可以将结果流发往一个Kafka主题:
.to("counter_movies", Produced.with(Serdes.Integer(), moviePlayCountSerder)
);
但是,如果我们感兴趣的不是每次对新事件的反应,而只是想查询特定的电影此时播放的次数,那又该怎么办呢?
Kafka Streams的交互式查询(interactive query)允许我们直接查询底层存储,以获取给定键相关的值。
首先,我们创建一个名为MoviePlayCountData
的类来存储查询结果。按照这种方式,我们可以解耦Kafka Streams使用的类与应用中其他部分所使用的类。
public class MoviePlayCountData {
private String name;
private int count;
public MoviePlayCountData(String name, int count) {
this.name = name;
this.count = count;
}
public int getCount() {
return count;
}
public String getName() {
return name;
}
}
现在,创建名为InteractiveQueries
的类来实现对状态存储(KeyValueBytesStoreSupplier
)的访问并根据电影的id
查询它被播放的次数。
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import java.util.Optional;
@ApplicationScoped
public class InteractiveQueries {
@Inject
KafkaStreams streams;
public <MoviePlayCountData> getMoviePlayCountData(int id) {
// 获取状态存储并根据电影id获取播放次数
MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id);
// 如果有结果的话
if (moviePlayCount != null) {
// 将结果包装到MoviePlayCountData中
return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count));
} else {
return Optional.empty();
}
}
// 获取状态存储
private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() {
while (true) {
try {
return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore()));
} catch (InvalidStateStoreException e) {
// 忽略之,此时存储尚未就绪
}
}
}
}
现在,我们可以添加一个简单的REST端点来运行该查询。
import java.util.Optional;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@Path("/movie")
public class MovieCountResource {
// 注入前文的类进行查询
@Inject
InteractiveQueries interactiveQueries;
@GET
@Path("/data/{id}")
public Response movieCountData(@PathParam("id") int id) {
Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id);
// 根据结果判定返回值还是404
if (moviePlayCountData.isPresent()) {
return Response.ok(moviePlayCountData.get()).build();
} else {
return Response.status(Status.NOT_FOUND.getStatusCode(),
"No data found for movie " + id).build();
}
}
}
Kafka Stream实现的模式如下图所示:
Kafka Streams应用可以进行扩展,所以流会分布到多个实例中。在这种情况下,每个实例都包含聚合结果的一个子集,所以要想获得总的聚合结果,我们需要通过将REST API重定向到另外的实例来获取其他实例的数据。
Kafka Streams提供了一个API,可以知道要请求的数据是在本地Kafka Streams存储中还是在其他的主机中。
虽然这个过程并不复杂,但已经超出了本文的讨论范围。
到目前为止,我们已经看到,将Quarkus应用连接到Apache Kafka并生产和消费主题中的消息/事件是很容易的。此外,还看到Kafka Streams让我们不仅可以消费Kafka中消息,还能够实时处理它们,进行转换、过滤等操作,例如以同步的方式消费结果数据。 这是一项强大的技术,当需要处理的数据不断变化时,它可以轻松扩展,提供实时的体验。
但是,我们还没有解决该架构的最后一个问题。通常情况下,数据并不是存储在一个地方。电影信息可能存储在关系型数据库中,而电影的播放信息则存储在一个Kafka主题中。那么,该如何保持这两个地方的信息更新,以便Kafka Streams能正确地连接数据呢?
这里有一个缺失的部分,名为Debezium的项目可以帮助我们解决这个问题。我们将用一整篇文章来介绍Debezium和Quarkus,敬请持续关注。
Alex Soto是红帽公司的开发者体验总监。他对Java领域、软件自动化充满热情,他相信开源软件模式。Soto是Manning的《Testing Java Microservices》和O’Reilly的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是Java Champion,是国际演讲者和Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ⚛️),随时了解 Kubernetes 和 Java 领域的动态。
查看英文原文:Kafka Streams and Quarkus: Real-Time Processing Events