spark kafka consumer 消费数据的二种方式

栏目: 编程工具 · 发布时间: 6年前

内容简介:如果实时的从kafka取数据,通过spark入hdfs,会产生很多的task,在hdfs上会产生非常多的小文件。浪费硬盘空间不说,在用spark进行数据分析的时间,非常耗spark节点的内存。最好不要实时的入库,间断去运行。这样会尽量减少小文件的产生。但是不能根本上解决小问题,最终还是通过CombineFileInputFormat来解决,这个后面的文章,会单独说。这种方式也是最容易想到的一种方式,spark-submit提交后,启动sparksession,启动kafka consumer,消费数据。这儿

如果实时的从kafka取数据,通过spark入hdfs,会产生很多的task,在hdfs上会产生非常多的小文件。浪费硬盘空间不说,在用spark进行数据分析的时间,非常耗spark节点的内存。

最好不要实时的入库,间断去运行。这样会尽量减少小文件的产生。但是不能根本上解决小问题,最终还是通过CombineFileInputFormat来解决,这个后面的文章,会单独说。

1,采crontab的方式

这种方式也是最容易想到的一种方式,spark-submit提交后,启动sparksession,启动kafka consumer,消费数据。这儿有一点要注意,数据入hdfs后,关闭sparksession,kafka consumer,这样可以节约系统资源。

2,采用akka包,处理方式,根crontab差不多(推荐)

pom.xml加载包,注意和当前scala版本要对的上,不然打包时会报错

<dependency>
 <groupId>com.typesafe.akka</groupId>
 <artifactId>akka-actor_2.11</artifactId>
 <version>2.5.9</version>
</dependency>

例子:

object test {
    def main(args: Array[String]): Unit = {
        。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。
        val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)
        /*这里填写主题名称*/
        consumer.subscribe(util.Arrays.asList(table))
        val system = akka.actor.ActorSystem("system")
        system.scheduler.schedule(0 seconds, 180 seconds)(taskerPc.saveData(args,consumer))
    }

    object taskerPc {
        def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {
            。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。
            /*
            *
            * spark.sql.warehouse.dir hdfs://主数据节点别名或者ip:post指定单个主机/
            * */
            val spark = new sql.SparkSession.Builder()
                    .config("spark.sql.warehouse.dir", func.cnf("spark.sql.warehouse.dir"))
                    .enableHiveSupport()
                    .appName(table)
                    .getOrCreate()

            val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))
            。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。
        }
    }
}

scala main函数中,开起了一个kafka consumer,会每隔180秒,去调用函数saveData,这种方式,consumer是不能关闭的,一关闭就无法消费topic里面的数据了。如果把val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros),放到了saveData中,就要关闭consumer,不然就会出现Attempt to heartbeat failed since group is rebalancing问题。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

第四次革命

第四次革命

[意]卢西亚诺•弗洛里迪(Luciano Floridi)著 / 王文革 / 浙江人民出版社 / 2016-5 / 64.90元

 随着线上线下大融合以及人工智能的极大发展,人类已经进入超历史时代。在这一时代中,人类终于迎来了继哥白尼革命、达尔文革命、神经科学革命之后自我认知的第四次革命——图灵革命,整个世界正化身为一个信息圈,每个人都生活在云端,人类已不再是信息圈毋庸置疑的主宰。毫无疑问,图灵革命引爆了人工智能重塑整个人类社会的序曲!  那么在人工智能时代,人类如何保证自己最钟爱的财富——“隐私”不被窃取?如何应......一起来看看 《第四次革命》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

SHA 加密
SHA 加密

SHA 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试