Apache Pulsar 之 Java Function 实践篇

栏目: Java · 发布时间: 5年前

内容简介:作者 | 冉小龙审校 | Anonymitaet编辑 | Susan + Anonymitaet

作者 | 冉小龙

审校 | Anonymitaet

编辑 | Susan + Anonymitaet

阅读本文需要约 15 分钟。

导读:在  一篇文章了解 Pulsar Functions 中,我们介绍了什么是 Pulsar Functions,它的运行机制以及原理。这篇文章具体介绍如何在 Java 中编写、部署并运行 Java Function。

部署 standalone Pulsar

前面我们说到 Functions 是 Pulsar 的计算层,Functions 的运行依赖于 Pulsar 的运行,为了方便演示,我们使用 Docker 运行一个 standalone Pulsar。

1. 从 Docker Hub 上获取 apachepulsar/pulsar:2.3.0 的镜像。

docker pull apachepulsar/pulsar:2.3.0

2. 启动 Pulsar。

docker run -it \

-p 6650:6650 \

-p 8080:8080 \

-v $PWD/pulsardata:/pulsar/data \

apachepulsar/pulsar:2.3.0 \

bin/pulsar standalone

其中 $PWD/pulsardata 是本地目录,使用  -v 参数将 docker 镜像中的  /pulsar/data 目录映射到本地的  $PWD/pulsardata 目录。

3.  进入 Pulsar 镜像。

docker ps

找到 apachepulsar/pulsar:2.3.0  的  CONTAINER ID 并执行:

docker exec -it [CONTAINER ID] /bin/bash

到目前为止,你成功启动了 standalone Pulsar。

编写 Java Function

下面开始编写自己的第一个 Java Function:

首先,新建一个 maven 工程,pom 文件具体如下:

Apache Pulsar 之 Java Function 实践篇

注意:

org.example.test.ExclamationFunction

这里的 mainClass 需要修改为自己的路径。

  • Java Function 代码示例:

package org.example.test;

import java.util.function.Function;

public class ExclamationFunction implements Function {

@Override

public String apply(String s) {

return "This is my function!";

}

}

在上述示例代码中,我们引用了 Java 8 提供的 Functions 接口,在 Pulsar Functions 中,同样提供了 Functions 接口,二者之间的主要区别是:Pulsar Functions 提供了 Context 接口,当你在编写 functions 时,如果需要与 Pulsar Functions 进行交互,可以使用 Context 来获取,示例如下:

Apache Pulsar 之 Java Function 实践篇

部署 Java Function

如下图所示,Pulsar Functions 目前支持两种状态:start 和 stop。start 到处理 function 之间,是 Pulsar Functions 的初始化阶段,例如,setupProducer、setupConsumer、setupLogTopic 等。

当初始化工作完成之后,Pulsar Functions 会动态加载用户的 function 代码,作为 Pulsar Functions 处理逻辑的一部分,如果用户编写的 function 有输出,在图中第二步 HandlerMessage 中,会有相应的 output 输出到下游来做相应的处理。

处理 function 到 stop 之间是 Pulsar Functions 处理 output 的过程。如图第三步所示,Pulsar Functions 会调用 processResult 函数,将相应的 output 处理之后,输出到 output topic 中,同时关闭用户在初始化工作时打开的系统资源。

Apache Pulsar 之 Java Function 实践篇

1. 打包 Function。

mvn package

这个时候打开 target 目录,查看是否有一个类似 java-function-1.0-SNAPSHOT.jar 的 jar 包。

2. 执行 Function。

上面你已经准备好了 standalone Pulsar, 但是 Pulsar 环境中目前还没有你打包好的 jar 文件,所以首先需要 copy 打包好的 jar 文件到 Pulsar 镜像中:

docker cp

$PWD/javafunction/target/java-function-1.0-SNAPSHOT.jar  CONTAINER ID:/pulsar

执行如下命令,运行 Function:

./bin/pulsar-admin functions localrun --classname org.example.test.ExclamationFunction --jar java-function-1.0-SNAPSHOT.jar --inputs persistent://public/default/my-topic-1 --output persistent://public/default/test-1 --tenant public --namespace default --name ExclamationFunctio6

以下是每个选项的解释:

Apache Pulsar 之 Java Function 实践篇

上述启动选项仅是作为 Demo 演示,更多详细选项可以通过 ./bin/pulsar-admin functions 直接查看。

启动之后,如果看到下述日志,证明启动成功:

Apache Pulsar 之 Java Function 实践篇

当你将上述启动命令的 localrun 替换为  create ,就能以集群的模式启动,启动后会输出一行日志,具体如下:

"Created successfully"

除了启动 Pulsar Functions 之外,Pulsar 提供了如下命令,用于控制 function 的状态。

停止 Java Function

停止 Pulsar Function 的命令为 stop ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,停止  ExclamationFunctio123 Function,具体如下:

root@856932ba3474:/pulsar# ./bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name ExclamationFunctio6

Stopped successfully

启动 Java Function

启动 Pulsar Functions 的命令为 start ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,启动  ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name ExclamationFunctio123

Started successfully

重启 Java Function

重启 Pulsar Functions 的命令为 restart ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,重启  ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions restart \

--tenant public \

--namespace default \

--name ExclamationFunctio123

Restarted successfully

更新 Java Function

当启动或者运行一段时间 Pulsar Functions 之后,如果想要更改 Function 的内部运行参数,可以使用 update 命令,update 提供的参数列表与创建一个 Function 基本一致,可以使用  ./bin/pulsar-admin functions 查看参数细节。

下面示例将 ExclamationFunctio123 的  cpu 核数更新为 10,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions update --tenant public --namespace default --name ExclamationFunctio123 --cpu 10

"Updated successfully"

删除 Java Function

删除 Pulsar Functions 的命令为 delete ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,删除  ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name ExclamationFunctio123

"Deleted successfully"

更多关于 Pulsar 的干货和动态分享,请关注公众号 Apache Pulsar。

Apache Pulsar 之 Java Function 实践篇


以上所述就是小编给大家介绍的《Apache Pulsar 之 Java Function 实践篇》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

领域特定语言

领域特定语言

Martin Fowler / ThoughtWorks中国 / 机械工业出版社华章公司 / 2013-3 / 89.00元

本书是DSL领域的丰碑之作,由世界级软件开发大师和软件开发“教父”Martin Fowler历时多年写作而成,ThoughtWorks中国翻译。全面详尽地讲解了各种DSL及其构造方式,揭示了与编程语言无关的通用原则和模式,阐释了如何通过DSL有效提高开发人员的生产力以及增进与领域专家的有效沟通,能为开发人员选择和使用DSL提供有效的决策依据和指导方法。 全书共57章,分为六个部分:第一部分介......一起来看看 《领域特定语言》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换