越来越多的企业选择将数据存储到云平台中。对于大部分软件体系结构而言,“数据上云”至关重要。将数据迁移上云,有助于降低企业采购软硬件的成本,减少监控、管理工作,提供较大存储容量。而且,云存储支持数据备份,保护数据免受勒索软件的侵害。
许多 Pulsar 用户选择将数据存储在各种云平台中,例如 Amazon Simple Storage Service(Amazon S3)或 Google Cloud Storage(Google GCS)等。如果没有统一的应用程序将主题级别的数据迁移到云存储,Pulsar 用户必须自己编写解决方案。这是一项繁琐的任务。今天,我们很高兴地宣布 Apache Pulsar 引入 Cloud Storage Sink 连接器(以下简称为 Cloud Storage 连接器)。Cloud Storage 连接器采用简单、可靠的方式,帮助用户将数据从 Apache Pulsar 迁移到云存储的对象中。
什么是 Cloud Storage 连接器
Cloud Storage 连接器定期轮询 Pulsar 数据,然后将其以 Avro、JSON 或 Parquet 格式存储到云存储的对象(AWS S3、Google GCS 等)中。根据用户的环境设置,Cloud Storage 连接器保证向消费者(consumer)“只发送一次” 消息。
Cloud Storage 连接器支持基于 Pulsar 主题分区或者基于时间(以天或小时为单位)的 partitioner。Partitioner 将 Pulsar 主题分区拆分成为多个数据块。数据块相当于云存储中的对象,其虚拟路径使用 Pulsar 分区 ID和该数据块的起始偏移量进行编码。对 Pulsar 分区和该数据块的起始偏移量进行编码。数据块的大小取决于云存储写入的记录的数量和 schema 兼容性。如果没有在配置中指定 partitioner,则使用保留 Pulsar 分区的缺省 partitioner。
Cloud Storage 连接器支持以下功能:
-
确保严格一次(Exactly-Once)的数据输出。Cloud Storage 使用 partitioner 导出 Pulsar 数据,这就确保 Cloud Storage 连接器能够实现“只发送一次”数据,从而满足云存储的数据一致性要求。
-
支持所有数据(无论是否带有 schema 格式)。Cloud Storage 连接器支持将采用 Avro、JSON 或 Parquet 格式的数据写入云存储的对象中。通常情况下,只要数据结构支持 `Format` 接口,Cloud Storage 连接器就可以将该数据迁移到云平台。
-
支持基于时间的 partitioner。 Cloud Storage 连接器使用 Pulsar 消息的 `publishTime` 时间戳定义 `TimeBasedPartitioner` 类。支持天或小时级别的时间间隔。
-
支持多种对象存储类型。 Cloud Storage 连接器使用 jclouds 实现云存储。用户可以使用 jclouds 的对象存储拓展 JAR 包来支持更多服务提供商。如需自定义连接到其他服务提供商所需的密钥,可以通过服务提供商接口(Service Provider Interface,SPI)注册 `org.apache.pulsar.io.jcloud.credential.JcloudsCredential`。
为什么需要 Cloud Storage 连接器
Apache Pulsar 提供丰富的连接器生态系统,将 Pulsar 与其他数据系统连接起来。2018 年 8 月,Apache Pulsar 发布 Pulsar IO。Pulsar IO 允许用户利用现有的 Pulsar Functions 框架,在 Pulsar 和外部系统(例如 MySQL 、Kafka)之间传输数据。但是,有些用户希望将数据从 Apache Pulsar 迁移到云存储。这些用户被迫构建定制解决方案并手动运行它们。
为了解决这些问题,Apache Pulsar 正式引入 Cloud Storage 连接器。Cloud Storage 连接器利用 Pulsar IO 的诸多优势,例如容错性、并行性、弹性、负载平衡、按需更新等,帮助用户将 Pulsar 数据导入云存储。
Cloud Storage 连接器易于使用。用户无需编写任何代码,即可获得一款支持多种对象存储服务商、支持灵活的数据格式、自定义数据分区的对象存储连接器。
试用 Cloud Storage 连接器
本节介绍如何安装 Cloud Storage 连接器并使用 Cloud Storage 连接器向外部系统发送 Pulsar 消息。在本节举例中,我们使用 AWS S3 作为存储 Pulsar 数据的云平台。Cloud Storage 连接器采用基于时间的 partitioner,将 Pulsar 数据以 Parquet 的格式存储到 AWS S3。
前提条件
-
创建 AWS 账户,并登录 AWS Management Console。
-
创建 AWS S3 存储桶。有关详细信息,参见[创建存储桶](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)。
-
获取 AWS S3 存储桶的密钥。有关详细信息,参见[创建管理员 IAM 用户和组(控制台)](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started_create-admin-group.html).
步骤 1:安装 Cloud Storage 连接器并运行 Pulsar Broker
-
[下载](https://github.com/streamnative/pulsar-io-cloud-storage/releases) Cloud Storage 连接器的安装包。
将 Cloud Storage 连接器的安装包,添加到 Pulsar broker 的配置文件中。
cp pulsar-io-cloud-storage-2.5.1.nar apache-pulsar-2.6.1/connectors/pulsar-io-cloud-storage-2.5.1.nar
使用 Pulsar broker 的配置文件,启动 Pulsar broker。
cd apache-pulsar-2.6.1
bin/pulsar standalone
步骤 2:配置并启动 Cloud Storage 连接器
-
创建 cloud-storage-sink-config.yaml 文件,并在文件中定义 Cloud Storage 连接器的配置,如下所示。
tenant: "public"
namespace: "default"
name: "cloud-storage-sink"
inputs:
- "user-avro-topic"
archive: "connectors/pulsar-io-cloud-storage-2.5.1.nar"
parallelism: 1
configs:
provider: "aws-s3",
accessKeyId: "accessKeyId"
secretAccessKey: "secretAccessKey"
role: ""
roleSessionName: ""
bucket: "s3-sink-test"
region: ""
endpoint: "us-standard"
formatType: "parquet"
partitionerType: "time"
timePartitionPattern: "yyyy-MM-dd"
timePartitionDuration: "1d"
batchSize: 10
batchTimeMs: 1000
将 cloud-storage-sink-config.yaml 文件中的 accessKeyId 和 secretAccessKey 的取值替换为 AWS 密钥。如需配置更多控制权限,可以设置 role 和 roleSessionName 字段。
-
使用 cloud-storage-sink-config.yaml`文件,在本地启动 Cloud Storage 连接器。
$PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file cloud-storage-sink-config.yaml
步骤3:发送 Pulsar 消息
运行以下命令,发送 Pulsar 消息。Pulsar 消息采用 Avro schema 格式。目前,Pulsar 消息只支持 Avro 或 JSON schema。
try (
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<TestRecord> producer = pulsarClient.newProducer(Schema.AVRO(TestRecord.class))
.topic("public/default/test-parquet-avro")
.create();
) {
List<TestRecord> testRecords = Arrays.asList(
new TestRecord("key1", 1, null),
new TestRecord("key2", 1, new TestRecord.TestSubRecord("aaa"))
);
for (TestRecord record : testRecords) {
producer.send(record);
}
}
步骤 4:验证 Pulsar 数据完整性
通过 AWS S3 Management Console,查看从 Pulsar 实时导入到 AWS S3 云平台中的数据。
结语
我们希望这篇文章能够引起您对 Cloud Storage 连接器的兴趣。Cloud Storage 连接器是一个开源项目,采用 Apache License V2。下载 Cloud Storage 连接器最新发布版本,开始使用 Cloud Storage。
如果在使用中遇到任何问题,可以在 Cloud Storage 连接器仓库中提交 issue,我们会在第一时间回应。同时,我们也欢迎您为 Cloud Storage sink 贡献特性。
猜你喜欢: