本文翻译自 StreamNative 博客《Spring into Pulsar》,作者 Tim Spann,StreamNative 布道师。
译者简介
姜吉宁,开源爱好者、终生学习者、健身爱好者。Github @jjnnzb[1]。
本文我们来探讨如何在 Java 框架——Spring 中整合 Apache Pulsar。文章阐述如何在 Java 中构建基于 Spring 的微服务。在正文内容开始前,我们先介绍 Spring。Spring 是 Java 生态中鼎鼎有名的技术框架,自诞生已有近 20 年历史。Spring 提供了极为方便的装配与控制机制,极大地降低了构建应用的难度。有了 Spring,开发者无需堆砌非业务相关的重复模板代码。基于 Spring,开发者可以如鱼得水般快速开发微服务应用,包括各类 REST API、Web 应用程序、控制台应用程序等。推荐大家深入研究 Spring。
如果你想基于 Spring 来开发自己的第一个应用,推荐打开官方提供的 Spring Starter 起步链接[2]。借助这个链接,可以不费吹灰之力搭建好脚手架,并在此基础之上编码,实现自己的业务逻辑。
在本文示例中,将展示如何基于 Spring Boot 提供的依赖注入机制,为应用程序接入实例化和已配置的 Apache Pulsar 来生产与消费消息。此外,我还会通过使用 AMQP、Kafka 和 MQTT 发送和接收消息来展示 Apache Pulsar 与其他消息传递协议集成的灵活性。
最后,本文将浅析 Reactive Pulsar。强大的响应式框架 Reactive 是想构建 Spring 响应式应用的开发者们的不二之选。
基于 Spring 和 Pulsar 构建空气质量应用
下图展示的是该应用的架构图。如图所示,Apache Pulsar 是该架构图的核心。Pulsar 在其中充当了路由、网关、消息总线和数据分发通道的角色。
选择 Apache Pulsar 的核心原因之一是它具备极强的可扩展性,Pulsar 客户端和应用均支持无限扩展,解决了消息存储与分发在扩展性方面的难题。基于该特性,我们无需做额外的复制便能够复用数据。该特性对很多应用场景非常友好,包括基于 Spark 做的 ETL 任务和基于 Flink 做的实时持续 SQL 流分析等。Pulsar 还为 Spring 微服务无缝支持其他语言编写的服务,包括 Go、Python、C#、C++ 和 Node.JS 等。
点击下图查看示例应用演示视频。
有了 Spring Boot Starter 提供的脚手架,我们可以向 Maven build pom 文件中添加一些依赖,或选择用 Gradle。
首先,配置好 Pulsar 相关版本依赖。本文的示例选择 Pulsar 2.10.0[3] 版本及 JDK 11。开发者当下不应坚持 Java 8 版本,因为在不久的未来,JDK 17 将会是官方推荐的标准版本。
11
2.10.0
接着,导入 Pulsar 客户端依赖。
org.apache.pulsar
pulsar-client
${pulsar.version}
org.apache.pulsar
pulsar-client-admin
${pulsar.version}
org.apache.pulsar
pulsar-client-original
${pulsar.version}
pom
通过以下命令来编译打包:
mvn package
输入以下命令,运行应用程序:
mvn spring-boot:run
在配置文件中(application.resources)填充必要值相关配置,以连接到集群,读取应用数据。该文件一般放在 src/main/resources
目录下。
airnowapi.url=${AIRPORTNOWAPIURL}
topic.name=persistent://public/default/airquality
producer.name=airquality
send.timeout=60
security.mode=off
pulsar.service.url=pulsar://pulsar1:6650
#security.mode=on
#pulsar.service.url=pulsar+ssl://demo.sndemo.snio.cloud:6651
pulsar.oauth2.audience=urn:sn:pulsar:sndemo:demo-cluster
pulsar.oauth2.credentials-url=file:///cr/sndemo-tspann.json
pulsar.oauth2.issuer-url=https://auth.streamnative.cloud/
server.port=8999
#kafka
kafka.bootstrapAddress=pulsar1:9092
kafka.topic.name=airqualitykafka
#mqtt
mqtt.automaticReconnect=true
mqtt.cleanSession=true
mqtt.connectionTimeout=60
mqtt.clientId=airquality-MQTT
mqtt.hostname=pulsar1
mqtt.port=1883
mqtt.topic=airqualitymqtt
#amqp/rabbitmq
amqp.server=pulsar1:5672
amqp.topic=amqp-airquality
如上所示,配置项 security.mode
和 pulsar.service.url
被注释掉了。这么配置的原因是,我可以灵活地在 StreamNative 托管的云生产环境和本地的开发环境之间切换。同时,我们也可以采用自动化流程或使用环境变量来更好地满足生产环境的需求。airnowapi.url
这个变量配置的是用于访问 Air Now REST 数据流的专用令牌,建议配置到环境变量中。如果你也想使用该数据流,请先注册[4]。
我们现在开始构建应用。第一步,配置连接,连接上 Apache Pulsar 集群。
第二步,我们来新建一个 Spring 配置类,来初始化 Pulsar 客户端。在配置类中,通过 @Value 注解来注入 application.properties
中相关的配置项。
@Configuration
public class PulsarConfig {
@Value("${pulsar.service.url}")
String pulsarUrl;
@Value("${security.mode:off}")
String securityMode;
@Value("${pulsar.oauth2.audience}")
String audience;
@Value("${pulsar.oauth2.credentials-url}")
String credentialsUrl;
@Value("${pulsar.oauth2.issuer-url}")
String issuerUrl;
@Bean
public org.apache.pulsar.client.api.PulsarClient pulsarClient() {
PulsarClient client = null;
if (securityMode.equalsIgnoreCase(OFF)) {
try {
client = PulsarClient.builder().serviceUrl(pulsarUrl).build();
} catch (PulsarClientException e) {
e.printStackTrace();
client = null;
}
} else {
try {
try {
client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl),
new URL(credentialsUrl),audience)).build();
} catch (MalformedURLException e) {
e.printStackTrace();
}
} catch (PulsarClientException e) {
e.printStackTrace();
client = null;
}
}
return client;
}
}
第三步,在服务中配置生产者,代码如下:
@Configuration
public class PulsarProducerConfig {
@Value("${producer.name:producername}")
String producerName;
@Value("${topic.name:airquality}")
String topicName;
@Autowired
PulsarClient pulsarClient;
@Bean
public Producer getProducer() {
ProducerBuilder producerBuilder = pulsarClient.newProducer(JSONSchema.of(Observation.class))
.topic(topicName)
.producerName(producerName).sendTimeout(60, TimeUnit.SECONDS);
Producer producer = null;
try {
producer = producerBuilder.create();
} catch (PulsarClientException e1) {
e1.printStackTrace();
}
return producer;
}
}
在上述的配置类代码中,我们要构建一个 Pulsar 的生产者,该生产者会使用 Observation 类中的 JSON Schema。该 Observation 类中引入了 FasterXML Jackson 相关注解,但该类实际上就是一个 Java bean,其中记录的是 REST 数据流提供的测量日期、测量时间、状态码、经纬度等信息。
生产者
我们添加上相关的业务逻辑代码,随即对接消息平台,测试消息发送流程。完整的源代码在此 Github 仓库。
@Service
public class PulsarService {
@Autowired
PulsarClient pulsarClient;
@Autowired
ProducerObservation> producer;
public MessageId sendObservation(Observation observation) {
if (observation == null) {
return null;
}
UUID uuidKey = UUID.randomUUID();
MessageId msgID = null;
try {
msgID = producer.newMessage()
.key(uuidKey.toString())
.value(observation)
.send();
} catch (PulsarClientException e) {
e.printStackTrace();
}
return msgID;
}
}
消费者
消息发送完毕之后,我们可以通过 Spring 读取消息。在本节中,我们会构建一套消费程序测试消费数据。如果要填充一些业务逻辑、做消息路由、将消息转换到一至多个主题中,建议通过 Pulsar Function 来实现(可通过 Java、Python 或 Go 编写),而非 Spring Boot 微服务。我在这里提供了两种实现。Pulsar Spring Boot 消费者的源码在可从此 GitHub 仓库[5]中获取。
如果通过 Java Pulsar Function 来处理空气质量数据,可以参考此 GitHub 仓库[6]中的代码。如以下架构图所示,各 Function、微服务、Spark 和 Flink 任务均可作为整个架构中的组成部分,协调处理实时流数据。
我们可以复用生产者中的配置类来连接集群。此外,我们还需要一套消费者的配置代码,该类需要在 application.properties
文件中配置消费者名称、订阅名称、主题名称并注入。在示例代码中,我们配置的订阅类型是 Shared(共享订阅),消费起始点是 Earliest。此外,我们还引入了在 Pulsar 生产者中使用的 Observation 来解析 JSON 数据。
@Configuration
public class PulsarConsumerConfig {
@Autowired
PulsarClient pulsarClient;
@Value("${consumer.name:consumerName}")
String consumerName;
@Value("${topic.name:airquality}")
String topicName;
@Value("${subscription.name:airqualitysubscription}")
String subscriptionName;
@Bean
public Consumer getConsumer() {
Consumer pulsarConsumer = null;
ConsumerBuilder consumerBuilder =
pulsarClient.newConsumer(JSONSchema.of(Observation.class))
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.consumerName(consumerName);
try {
pulsarConsumer = consumerBuilder.subscribe();
} catch (PulsarClientException e) {
e.printStackTrace();
}
return pulsarConsumer;
}
}
显然,运行消费者程序非常容易。在接收到消息事件之后,进行转换得到普通 Java 对象(Plain Old Java Object,即 POJO),我们可以对数据做任意处理,包括将 Spring 库持久化到数据库、发送到 REST 服务中或存储到文件等。
结语
本文中,我们探索了通过多种消息协议来与 Apache Pulsar 集群通信。由于文章篇幅有限,我们并没有测试全部 Apache Pulsar 支持的消息协议。事实上,我们还可以通过 RocketMQ、WebSocket 来与 Pulsar 集群交互,甚至还可以通过 JDBC 连接到 Pulsar SQL 层(Presto SQL)。
如果你对构建高速的响应式应用感兴趣,推荐试试 Reactive Pulsar 库。Reactive Pulsar 是一款快速高效的库,需要另外一篇单独的文章去介绍,可以点击此链接[7]了解更多信息。
本文列举了基于 Apache Pulsar 和 Spring 来构建应用的要点,包括关键步骤、实战细节等。Pulsar Java 客户端作为 Apache Pulsar 项目中的一等公民,文章借助实例展示了它的强大功能和灵活性。落地实战起来,打造自己的 Pulsar 应用吧!
相关资源
-
• 源码|空气质量示例代码[8]
-
• 源码|Pulsar 提供的空气质量函数[9]
-
• 源码|空气指令消费者[10]
-
• 源码|FLiPN 空气质量校验[11]
-
• 源码|FLiPN 空气质量 REST[12]
-
• GitHub|Spring Boot Starter for Apache Pulsar[13]
-
• Github|Reactive Pulsar Adapter[14]
-
• 文档|Pulsar Java 客户端[15]
更多内容
学习 Pulsar 相关基础知识:尽管并没有覆盖到 Pulsar 的全部基础知识,本篇博客依然不失为一篇优秀的入门文章。如果您刚接触 Pulsar,我们推荐您先学一下 StreamNative 录制的入门课程,帮助您快速入门。
基于 Pulsar 构建微服务:如果您对微服务和 Pulsar 相关主题感兴趣,不妨看看以下资源:
-
• 如何开发 Pulsar Function?[16]
相关阅读
引用链接
[1]
jjnnzb: https://github.com/jjnnzb[2]
起步链接: https://start.spring.io/[3]
Pulsar 2.10.0: https://pulsar.apache.org/en/release-notes/[4]
注册: https://docs.airnowapi.org/[5]
GitHub 仓库: https://github.com/tspannhw/airquality[6]
GitHub 仓库: https://github.com/tspannhw/pulsar-airquality-function[7]
链接: https://github.com/lhotari/reactive-iot-backend-ApacheCon2021[8]
源码|空气质量示例代码: https://github.com/tspannhw/airquality[9]
源码|Pulsar 提供的空气质量函数: https://github.com/tspannhw/pulsar-airquality-function[10]
源码|空气指令消费者: https://github.com/tspannhw/airquality-consumer[11]
源码|FLiPN 空气质量校验: https://github.com/tspannhw/FLiPN-AirQuality-Checks[12]
源码|FLiPN 空气质量 REST: https://github.com/tspannhw/FLiPN-AirQuality-REST[13]
GitHub|Spring Boot Starter for Apache Pulsar: https://github.com/majusko/pulsar-java-spring-boot-starter[14]
Github|Reactive Pulsar Adapter: https://github.com/datastax/reactive-pulsar[15]
文档|Pulsar Java 客户端: https://pulsar.apache.org/docs/en/client-libraries-java/[16]
如何开发Pulsar Function?: https://pulsar.apache.org/docs/en/functions-develop/
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/6653/