内容简介:如果实时的从kafka取数据,通过spark入hdfs,会产生很多的task,在hdfs上会产生非常多的小文件。浪费硬盘空间不说,在用spark进行数据分析的时间,非常耗spark节点的内存。最好不要实时的入库,间断去运行。这样会尽量减少小文件的产生。但是不能根本上解决小问题,最终还是通过CombineFileInputFormat来解决,这个后面的文章,会单独说。这种方式也是最容易想到的一种方式,spark-submit提交后,启动sparksession,启动kafka consumer,消费数据。这儿
如果实时的从kafka取数据,通过spark入hdfs,会产生很多的task,在hdfs上会产生非常多的小文件。浪费硬盘空间不说,在用spark进行数据分析的时间,非常耗spark节点的内存。
最好不要实时的入库,间断去运行。这样会尽量减少小文件的产生。但是不能根本上解决小问题,最终还是通过CombineFileInputFormat来解决,这个后面的文章,会单独说。
1,采crontab的方式
这种方式也是最容易想到的一种方式,spark-submit提交后,启动sparksession,启动kafka consumer,消费数据。这儿有一点要注意,数据入hdfs后,关闭sparksession,kafka consumer,这样可以节约系统资源。
2,采用akka包,处理方式,根crontab差不多(推荐)
pom.xml加载包,注意和当前scala版本要对的上,不然打包时会报错
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.5.9</version> </dependency>
例子:
object test { def main(args: Array[String]): Unit = { 。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。 val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros) /*这里填写主题名称*/ consumer.subscribe(util.Arrays.asList(table)) val system = akka.actor.ActorSystem("system") system.scheduler.schedule(0 seconds, 180 seconds)(taskerPc.saveData(args,consumer)) } object taskerPc { def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = { 。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。 /* * * spark.sql.warehouse.dir hdfs://主数据节点别名或者ip:post指定单个主机/ * */ val spark = new sql.SparkSession.Builder() .config("spark.sql.warehouse.dir", func.cnf("spark.sql.warehouse.dir")) .enableHiveSupport() .appName(table) .getOrCreate() val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3)) 。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。 } } }
scala main函数中,开起了一个kafka consumer,会每隔180秒,去调用函数saveData,这种方式,consumer是不能关闭的,一关闭就无法消费topic里面的数据了。如果把val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros),放到了saveData中,就要关闭consumer,不然就会出现Attempt to heartbeat failed since group is rebalancing问题。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Java实现生产者消费者问题的多种方式
- 生产者/消费者问题的多种Java实现方式
- Spring Cloud Alibaba基础教程:支持的几种服务消费方式(RestTemplate、WebClient、Feign)
- 消费端如何保证消息队列MQ的有序消费
- 《吊打面试官》系列-分布式事务、重复消费、顺序消费
- 十一贝:航延险智能判定,公平消费环境惠及消费者
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
人人都是架构师:分布式系统架构落地与瓶颈突破
高翔龙 / 电子工业出版社 / 2017-5 / 69
《人人都是架构师:分布式系统架构落地与瓶颈突破》并没有过多渲染系统架构的理论知识,而是切切实实站在开发一线角度,为各位读者诠释了大型网站在架构演变过程中出现一系列技术难题时的解决方案。《人人都是架构师:分布式系统架构落地与瓶颈突破》首先从分布式服务案例开始介绍,重点为大家讲解了大规模服务化场景下企业应该如何实施服务治理;然后在大流量限流/消峰案例中,笔者为大家讲解了应该如何有效地对流量实施管制,避......一起来看看 《人人都是架构师:分布式系统架构落地与瓶颈突破》 这本书的介绍吧!