学习 Flink(十三):Cassandra Connector

栏目: 数据库 · 发布时间: 6年前

内容简介:Flink 支持编辑Cassandra Sink 底层使用了 DataStax Java Driver。支持 CQL + Tuple 和 ORM 两种方式写入 Cassandra。

Flink 支持 Cassandra 作为 Sink。

依赖

编辑 pom.xml 文件,添加依赖:

<dependency>  
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-cassandra_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

Sink

Cassandra Sink 底层使用了 DataStax Java Driver。支持 CQL + Tuple 和 ORM 两种方式写入 Cassandra。

CQL + Tuple

已知 Tuple 有两个元素,第一个元素为 id,第二个元素为 name。

写入:chestnut::

CassandraSink.addSink(stream)  
        .setHost("127.0.0.1")
        .setClusterBuilder(new ClusterBuilder() {
            @Override
            protected Cluster buildCluster(Cluster.Builder builder) {
                return builder.withCredentials("username", "password").build();
            }
        })
        .setQuery("INSERT INTO dm.user(id, name) values (?, ?);")
        .build();

ORM

定义 Java Bean:

@Table(keyspace = "dm", name = "user")
public class Test {

    @Column(name = "id")
    private Long id;

    @Column(name = "name")
    private String name;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

写入:chestnut::

CassandraSink.addSink(stream)  
        .setHost("127.0.0.1")
        .setClusterBuilder(new ClusterBuilder() {
            @Override
            protected Cluster buildCluster(Cluster.Builder builder) {
                return builder.withCredentials("username", "password").build();
            }
        })
        .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
        .build();

Q&A

本地运行,报错 java: cannot access org.apache.flink.streaming.api.scala.DataStream

编辑 pom.xml 文件,添加依赖:

<dependency>  
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

异常 java.lang.IllegalArgumentException: No support for the type of the given DataStream: GenericType

CassandraSink 输入类型必须为以下任意一种:

  • Flink Java Tuple
  • Scala case classe
  • Row
  • POJO

详情参考文档: Data Types & Serialization - Apache Flink Document

参考


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

一个人的电商

一个人的电商

许晓辉 / 电子工业出版社 / 2015-5-1 / CNY 59.00

首次披露电商的运营与操盘内幕,徐小平、梁宁作序,雷军、陈彤、张向东、刘韧、王峰力荐! 这个时代在经历前所未有的转型甚至颠覆,任何行业都将与互联网无缝融合,成为“互联网+”。有很多写电商的书,大多都用浓墨重彩阐释互联网转型的必要性,而讲到如何落地实操则浅尝即止,令人心潮澎拜之后不知如何下手。于是有了这本既有方法论,更重视实操细节的书。 许晓辉,在知名电商公司凡客诚品做过高管,有海......一起来看看 《一个人的电商》 这本书的介绍吧!

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具