Apache Pulsar 之 Java Function 实践篇

栏目: Java · 发布时间: 6年前

内容简介:作者 | 冉小龙审校 | Anonymitaet编辑 | Susan + Anonymitaet

作者 | 冉小龙

审校 | Anonymitaet

编辑 | Susan + Anonymitaet

阅读本文需要约 15 分钟。

导读:在  一篇文章了解 Pulsar Functions 中,我们介绍了什么是 Pulsar Functions,它的运行机制以及原理。这篇文章具体介绍如何在 Java 中编写、部署并运行 Java Function。

部署 standalone Pulsar

前面我们说到 Functions 是 Pulsar 的计算层,Functions 的运行依赖于 Pulsar 的运行,为了方便演示,我们使用 Docker 运行一个 standalone Pulsar。

1. 从 Docker Hub 上获取 apachepulsar/pulsar:2.3.0 的镜像。

docker pull apachepulsar/pulsar:2.3.0

2. 启动 Pulsar。

docker run -it \

-p 6650:6650 \

-p 8080:8080 \

-v $PWD/pulsardata:/pulsar/data \

apachepulsar/pulsar:2.3.0 \

bin/pulsar standalone

其中 $PWD/pulsardata 是本地目录,使用  -v 参数将 docker 镜像中的  /pulsar/data 目录映射到本地的  $PWD/pulsardata 目录。

3.  进入 Pulsar 镜像。

docker ps

找到 apachepulsar/pulsar:2.3.0  的  CONTAINER ID 并执行:

docker exec -it [CONTAINER ID] /bin/bash

到目前为止,你成功启动了 standalone Pulsar。

编写 Java Function

下面开始编写自己的第一个 Java Function:

首先,新建一个 maven 工程,pom 文件具体如下:

Apache Pulsar 之 Java Function 实践篇

注意:

org.example.test.ExclamationFunction

这里的 mainClass 需要修改为自己的路径。

  • Java Function 代码示例:

package org.example.test;

import java.util.function.Function;

public class ExclamationFunction implements Function {

@Override

public String apply(String s) {

return "This is my function!";

}

}

在上述示例代码中,我们引用了 Java 8 提供的 Functions 接口,在 Pulsar Functions 中,同样提供了 Functions 接口,二者之间的主要区别是:Pulsar Functions 提供了 Context 接口,当你在编写 functions 时,如果需要与 Pulsar Functions 进行交互,可以使用 Context 来获取,示例如下:

Apache Pulsar 之 Java Function 实践篇

部署 Java Function

如下图所示,Pulsar Functions 目前支持两种状态:start 和 stop。start 到处理 function 之间,是 Pulsar Functions 的初始化阶段,例如,setupProducer、setupConsumer、setupLogTopic 等。

当初始化工作完成之后,Pulsar Functions 会动态加载用户的 function 代码,作为 Pulsar Functions 处理逻辑的一部分,如果用户编写的 function 有输出,在图中第二步 HandlerMessage 中,会有相应的 output 输出到下游来做相应的处理。

处理 function 到 stop 之间是 Pulsar Functions 处理 output 的过程。如图第三步所示,Pulsar Functions 会调用 processResult 函数,将相应的 output 处理之后,输出到 output topic 中,同时关闭用户在初始化工作时打开的系统资源。

Apache Pulsar 之 Java Function 实践篇

1. 打包 Function。

mvn package

这个时候打开 target 目录,查看是否有一个类似 java-function-1.0-SNAPSHOT.jar 的 jar 包。

2. 执行 Function。

上面你已经准备好了 standalone Pulsar, 但是 Pulsar 环境中目前还没有你打包好的 jar 文件,所以首先需要 copy 打包好的 jar 文件到 Pulsar 镜像中:

docker cp

$PWD/javafunction/target/java-function-1.0-SNAPSHOT.jar  CONTAINER ID:/pulsar

执行如下命令,运行 Function:

./bin/pulsar-admin functions localrun --classname org.example.test.ExclamationFunction --jar java-function-1.0-SNAPSHOT.jar --inputs persistent://public/default/my-topic-1 --output persistent://public/default/test-1 --tenant public --namespace default --name ExclamationFunctio6

以下是每个选项的解释:

Apache Pulsar 之 Java Function 实践篇

上述启动选项仅是作为 Demo 演示,更多详细选项可以通过 ./bin/pulsar-admin functions 直接查看。

启动之后,如果看到下述日志,证明启动成功:

Apache Pulsar 之 Java Function 实践篇

当你将上述启动命令的 localrun 替换为  create ,就能以集群的模式启动,启动后会输出一行日志,具体如下:

"Created successfully"

除了启动 Pulsar Functions 之外,Pulsar 提供了如下命令,用于控制 function 的状态。

停止 Java Function

停止 Pulsar Function 的命令为 stop ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,停止  ExclamationFunctio123 Function,具体如下:

root@856932ba3474:/pulsar# ./bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name ExclamationFunctio6

Stopped successfully

启动 Java Function

启动 Pulsar Functions 的命令为 start ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,启动  ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name ExclamationFunctio123

Started successfully

重启 Java Function

重启 Pulsar Functions 的命令为 restart ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,重启  ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions restart \

--tenant public \

--namespace default \

--name ExclamationFunctio123

Restarted successfully

更新 Java Function

当启动或者运行一段时间 Pulsar Functions 之后,如果想要更改 Function 的内部运行参数,可以使用 update 命令,update 提供的参数列表与创建一个 Function 基本一致,可以使用  ./bin/pulsar-admin functions 查看参数细节。

下面示例将 ExclamationFunctio123 的  cpu 核数更新为 10,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions update --tenant public --namespace default --name ExclamationFunctio123 --cpu 10

"Updated successfully"

删除 Java Function

删除 Pulsar Functions 的命令为 delete ,它提供了如下参数列表:

Apache Pulsar 之 Java Function 实践篇

下面以 tenantnamespacename 为例,删除  ExclamationFunctio123 Function,具体如下:

root@73866a1fa019:/pulsar# ./bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name ExclamationFunctio123

"Deleted successfully"

更多关于 Pulsar 的干货和动态分享,请关注公众号 Apache Pulsar。

Apache Pulsar 之 Java Function 实践篇


以上所述就是小编给大家介绍的《Apache Pulsar 之 Java Function 实践篇》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Black Box Society

The Black Box Society

Frank Pasquale / Harvard University Press / 2015-1-5 / USD 35.00

Every day, corporations are connecting the dots about our personal behavior—silently scrutinizing clues left behind by our work habits and Internet use. The data compiled and portraits created are inc......一起来看看 《The Black Box Society》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具