内容简介:作者 | 冉小龙审校 | Anonymitaet编辑 | Susan + Anonymitaet
作者 | 冉小龙
审校 | Anonymitaet
编辑 | Susan + Anonymitaet
阅读本文需要约 15 分钟。
导读:在 一篇文章了解 Pulsar Functions 中,我们介绍了什么是 Pulsar Functions,它的运行机制以及原理。这篇文章具体介绍如何在 Java 中编写、部署并运行 Java Function。
部署 standalone Pulsar
前面我们说到 Functions 是 Pulsar 的计算层,Functions 的运行依赖于 Pulsar 的运行,为了方便演示,我们使用 Docker 运行一个 standalone Pulsar。
1. 从 Docker Hub 上获取 apachepulsar/pulsar:2.3.0 的镜像。
docker pull apachepulsar/pulsar:2.3.0
2. 启动 Pulsar。
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
-v $PWD/pulsardata:/pulsar/data \
apachepulsar/pulsar:2.3.0 \
bin/pulsar standalone
其中 $PWD/pulsardata 是本地目录,使用 -v 参数将 docker 镜像中的 /pulsar/data 目录映射到本地的 $PWD/pulsardata 目录。
3. 进入 Pulsar 镜像。
docker ps
找到 apachepulsar/pulsar:2.3.0 的 CONTAINER ID 并执行:
docker exec -it [CONTAINER ID] /bin/bash
到目前为止,你成功启动了 standalone Pulsar。
编写 Java Function
下面开始编写自己的第一个 Java Function:
首先,新建一个 maven 工程,pom 文件具体如下:
注意:
这里的 mainClass 需要修改为自己的路径。
-
Java Function 代码示例:
package org.example.test;
import java.util.function.Function;
public class ExclamationFunction implements Function
@Override
public String apply(String s) {
return "This is my function!";
}
}
在上述示例代码中,我们引用了 Java 8 提供的 Functions 接口,在 Pulsar Functions 中,同样提供了 Functions 接口,二者之间的主要区别是:Pulsar Functions 提供了 Context 接口,当你在编写 functions 时,如果需要与 Pulsar Functions 进行交互,可以使用 Context 来获取,示例如下:
部署 Java Function
如下图所示,Pulsar Functions 目前支持两种状态:start 和 stop。start 到处理 function 之间,是 Pulsar Functions 的初始化阶段,例如,setupProducer、setupConsumer、setupLogTopic 等。
当初始化工作完成之后,Pulsar Functions 会动态加载用户的 function 代码,作为 Pulsar Functions 处理逻辑的一部分,如果用户编写的 function 有输出,在图中第二步 HandlerMessage 中,会有相应的 output 输出到下游来做相应的处理。
处理 function 到 stop 之间是 Pulsar Functions 处理 output 的过程。如图第三步所示,Pulsar Functions 会调用 processResult 函数,将相应的 output 处理之后,输出到 output topic 中,同时关闭用户在初始化工作时打开的系统资源。
1. 打包 Function。
mvn package
这个时候打开 target 目录,查看是否有一个类似 java-function-1.0-SNAPSHOT.jar 的 jar 包。
2. 执行 Function。
上面你已经准备好了 standalone Pulsar, 但是 Pulsar 环境中目前还没有你打包好的 jar 文件,所以首先需要 copy 打包好的 jar 文件到 Pulsar 镜像中:
docker cp
$PWD/javafunction/target/java-function-1.0-SNAPSHOT.jar CONTAINER ID:/pulsar
执行如下命令,运行 Function:
./bin/pulsar-admin functions localrun --classname org.example.test.ExclamationFunction --jar java-function-1.0-SNAPSHOT.jar --inputs persistent://public/default/my-topic-1 --output persistent://public/default/test-1 --tenant public --namespace default --name ExclamationFunctio6
以下是每个选项的解释:
上述启动选项仅是作为 Demo 演示,更多详细选项可以通过 ./bin/pulsar-admin functions 直接查看。
启动之后,如果看到下述日志,证明启动成功:
当你将上述启动命令的 localrun 替换为 create ,就能以集群的模式启动,启动后会输出一行日志,具体如下:
"Created successfully"
除了启动 Pulsar Functions 之外,Pulsar 提供了如下命令,用于控制 function 的状态。
停止 Java Function
停止 Pulsar Function 的命令为 stop ,它提供了如下参数列表:
下面以 tenant 、 namespace 、 name 为例,停止 ExclamationFunctio123 Function,具体如下:
root@856932ba3474:/pulsar# ./bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name ExclamationFunctio6
Stopped successfully
启动 Java Function
启动 Pulsar Functions 的命令为 start ,它提供了如下参数列表:
下面以 tenant 、 namespace 、 name 为例,启动 ExclamationFunctio123 Function,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name ExclamationFunctio123
Started successfully
重启 Java Function
重启 Pulsar Functions 的命令为 restart ,它提供了如下参数列表:
下面以 tenant 、 namespace 、 name 为例,重启 ExclamationFunctio123 Function,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions restart \
--tenant public \
--namespace default \
--name ExclamationFunctio123
Restarted successfully
更新 Java Function
当启动或者运行一段时间 Pulsar Functions 之后,如果想要更改 Function 的内部运行参数,可以使用 update 命令,update 提供的参数列表与创建一个 Function 基本一致,可以使用 ./bin/pulsar-admin functions 查看参数细节。
下面示例将 ExclamationFunctio123 的 cpu 核数更新为 10,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions update --tenant public --namespace default --name ExclamationFunctio123 --cpu 10
"Updated successfully"
删除 Java Function
删除 Pulsar Functions 的命令为 delete ,它提供了如下参数列表:
下面以 tenant 、 namespace 、 name 为例,删除 ExclamationFunctio123 Function,具体如下:
root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name ExclamationFunctio123
"Deleted successfully"
更多关于 Pulsar 的干货和动态分享,请关注公众号 Apache Pulsar。
以上所述就是小编给大家介绍的《Apache Pulsar 之 Java Function 实践篇》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【语言模型系列】实践篇:ALBERT在房产领域的实践
- Go Module 工程化实践(三):工程实践篇
- 火爆开发概念之——微服务实践篇
- 实践篇 | 推荐系统之矩阵分解模型
- 如果非得了解下 GIT 系统… C 实践篇
- 一个极简、高效的秒杀系统(战术实践篇)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Black Box Society
Frank Pasquale / Harvard University Press / 2015-1-5 / USD 35.00
Every day, corporations are connecting the dots about our personal behavior—silently scrutinizing clues left behind by our work habits and Internet use. The data compiled and portraits created are inc......一起来看看 《The Black Box Society》 这本书的介绍吧!