flink 热词统计(2): 使用Kafka

栏目: 编程工具 · 发布时间: 5年前

内容简介:本次改造将仅改造数据输入流和输出流的部分上篇文章我们定义了数据格式,基于此我们来定义kafka的传递数据的格式,即为由于引入了Kafka,我们就要传入参数,这里解析参数用的是官方采用的方法

本次改造将仅改造数据输入流和输出流的部分

上篇文章我们定义了数据格式,基于此我们来定义kafka的传递数据的格式,即为 ${timetamp},${word} ,中间用逗号分隔,好吧我承认看起来是和当时我们定义数据格式是一样的,但是这里要注意的是分隔符的选取 因为我们这里不是复杂的业务场景,不需要用上json,用常用的列分隔符就好,比如csv中用到的逗号,但是这里要注意的是你的 ${word} 里是否会包含有你所选取的分隔符,这点很关键

消息传入改为Kafka

由于引入了Kafka,我们就要传入参数,这里解析参数用的是官方采用的方法

val params = ParameterTool.fromArgs(args)
复制代码

然后我们用参数来创建一个FlinkKafkaConsumer011对象,通过env.addSource添加到环境中

val dataStream = env.addSource(
      new FlinkKafkaConsumer011[(String)](
        params.getRequired("input.topic"),new SimpleStringSchema, params.getProperties)).uid("add-source")
复制代码

这里我们只是用了SimpleStringSchema来解析Kafka中消息为字符串,我们还需要一步map操作进行数据转换

.map{x =>
        val arr = x.split(",")
        if(arr.length != 2){
		  println(s"解析${x}失败")
          return null
        }else {
          (arr(0), arr(1))
        }
  }.filter(_ != null)
复制代码

这里做了一个容错处理,如果解析失败的话,就会返回一个null,意即把一个null数据传递给下一个operator,然后我们再用一个filter对它进行过滤

这一步我们做完了,就可以开始测试一波了

--bootstrap.servers localhost:9092
--group.id flink
--input.topic flink-searchTrend-source
复制代码

我们将参数存在txt文档里,方便以后复制 在本地调试时可以将参数粘贴进如图所示的输入框中,然后运行main方法

flink 热词统计(2): 使用Kafka
flink 热词统计(2): 使用Kafka

这里调试的时候建议用kafka的命令行命令,如下

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-hotWordStatisticJob-source
复制代码

这里我们先把对消息传入的这一环节改造的部分调通,再进行下一步骤

消息输出改为Kafka

我们把之前计算后的DataStream赋值给resultDataStream,然后给其设定数据输出口,即Sink,当然事先也得把它变成字符串

resultDataStream.map(x => s"${x._2},${x._3}").addSink(
      new FlinkKafkaProducer011[String](
        params.getRequired("output.topic"),new SimpleStringSchema,params.getProperties
      )
    )
复制代码

参数中增加一个

--output.topic flink-hotWordStatisticJob-sink
复制代码

最后的效果

输入消息

flink 热词统计(2): 使用Kafka

接受消息

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-hotWordStatisticJob-sink
复制代码
flink 热词统计(2): 使用Kafka

结尾

如果产品设计的需求就是热词统计一直累加的话,那么写到这,代码就可以部署至生产了,当然他以后肯定是会变的


以上所述就是小编给大家介绍的《flink 热词统计(2): 使用Kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

高可用架构(第1卷)

高可用架构(第1卷)

高可用架构社区 / 电子工业出版社 / 2017-11-1 / 108.00元

《高可用架构(第1卷)》由数十位一线架构师的实践与经验凝结而成,选材兼顾技术性、前瞻性与专业深度。各技术焦点,均由极具代表性的领域专家或实践先行者撰文深度剖析,共同组成“高可用”的全局视野与领先高度,内容包括精华案例、分布式原理、电商架构等热门专题,及云计算、容器、运维、大数据、安全等重点方向。不仅架构师可以从中受益,其他IT、互联网技术从业者同样可以得到提升。一起来看看 《高可用架构(第1卷)》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具