内容简介:版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827
接着上一篇 京东金融数据分析案例(一) 来
任务 5
利用 spark streaming 实时分析每个页面点击次数和不同年龄段消费总金额
步骤:编写 Kafka produer 程序读取hdfs上的文件每隔一段时间产生数据,然后使用spark streaming读取kafka中的数据进行分析,分析结果写入到 redis 中。
(1)将 t_click 数据依次写入 kafka 中的 t_click 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,click_time+” ,” +pid 为 value
public class ClickProducer { private static KafkaProducer<String,String> producer; public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("producer.ClickProducer"); JavaSparkContext sc = new JavaSparkContext(conf); //从hdfs上读取数据 JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_click"); //kafka的broker的地址(localhost:9092)这里设置常量 String brokers = KafkaRedisConfig.KAFKA_ADDR; Map<String, Object> props = new HashMap<String, Object>(); props.put("bootstrap.servers", brokers); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String,String>(props); //循环读取到的数据,每隔十秒发送一条 input.foreach(new VoidFunction<String>() { public void call(String s) throws Exception { String[] split = s.split(","); JSONObject event = new JSONObject(); event.put("uid",split[1]+","+split[2]); ProducerRecord<String, String> msg = new ProducerRecord<String, String>(KafkaRedisConfig.KAFKA_CLICK_TOPIC, event.toString()); producer.send(msg); System.out.println("Message sent: " + event); Thread.sleep(1000); } }); } }
运行结果
``` {"uid":"2016-10-04 12:22:30,1"} {"uid":"2016-08-09 11:24:13,9"} {"uid":"2016-09-27 14:40:37,10"} {"uid":"2016-10-04 12:18:42,6"} ......
(2)将 t_order 数据依次写入 kafka 中的 t_order 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,uid+” ,” +price + “,” + discount 为value
public class OrderProducer { private static KafkaProducer<String,String> producer; public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("producer.OrderProducer"); JavaSparkContext sc = new JavaSparkContext(conf); //从hdfs上读取数据 JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_order"); //t_order 主题 使用常量定义 final String topic = KafkaRedisConfig.KAFKA_ORDER_TOPIC; //kafka的broker的地址(localhost:9092)这里设置常量 String brokers = KafkaRedisConfig.KAFKA_ADDR; Map<String, Object> props = new HashMap<String, Object>(); props.put("bootstrap.servers", brokers); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String,String>(props); //循环读取到的数据,每隔十秒发送一条 input.foreach(new VoidFunction<String>() { public void call(String s) throws Exception { String[] split = s.split(","); JSONObject event = new JSONObject(); if(!split[0].contains("uid")){ event.put("uid",split[0]+","+split[2]+","+split[5]); ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, event.toString()); producer.send(msg); System.out.println("Message sent: " + event); } Thread.sleep(2000); } }); } }
运行结果
{"uid":"55792,1.4898961024,0"} {"uid":"45370,3.9950093311,0"} {"uid":"85278,0.6658123361,0"} ......
(3)编写 spark streaming 程序,依次读取 kafka 中 t_click 主题数据,并统计: 每个页面累计点击次数,并存入 redis,其中 key 为” click+pid” ,value 为累计的次数
public class ClickStreamingAnalysis { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf().setAppName("ClickStreamingAnalysis"); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5)); ssc.sparkContext().setLogLevel("WARN"); // Kafka configurations String[] topics = KafkaRedisConfig.KAFKA_CLICK_TOPIC.split("\\,"); System.out.println("Topics: " + Arrays.toString(topics)); String brokers = KafkaRedisConfig.KAFKA_ADDR; Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); // Create a direct stream 这里使用spark-streaming-kafka-0-8_2.11中的kafkautil JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, new HashSet<String>(Arrays.asList(topics))); //接收到的数据格式为{"uid":"2016-10-04 12:22:30,1"} 创建json对象 JavaDStream<JSONObject> events = kafkaStream.map(new Function<Tuple2<String, String>, JSONObject>() { public JSONObject call(Tuple2<String, String> line) throws Exception { System.out.println("line:" + line._2()); JSONObject data = JSON.parseObject(line._2()); return data; } }); //取出pid 并map成(pid,1)的格式,然后聚合即可算出此批次该pid的点击次数 JavaPairDStream<String, Long> clickDs = events.mapToPair(new PairFunction<JSONObject, String, Long>() { public Tuple2<String, Long> call(JSONObject json) throws Exception { // System.out.println("clickUID:" + json.getString("uid")); return new Tuple2<String, Long>(json.getString("uid").split(",")[1], 1L); } }).reduceByKey(new Function2<Long, Long, Long>() { public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); //定义一个redis的hashkey final String clickHashKey = "pid::click"; clickDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() { public void call(JavaPairRDD<String, Long> rdd) throws Exception { rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() { public void call(Iterator<Tuple2<String, Long>> tuple2Iterator) throws Exception { //在foreachPartition中创建jdeis的连接可以减少连接 Jedis jedis = JavaRedisClient.get().getResource(); try{ while (tuple2Iterator.hasNext()){ Tuple2<String, Long> next = tuple2Iterator.next(); String pid = "click"+next._1(); Long clickCount = next._2(); jedis.hincrBy(clickHashKey, pid, clickCount); System.out.println(pid+":"+clickCount); } }catch (Exception e){ System.out.println("error:"+e); } //用完一定要关了,不然连接池泄露程序就卡主了 jedis.close(); } }); } }); ssc.start(); ssc.awaitTermination(); } }
运行结果redis
127.0.0.1:6379> HGETALL age::money 1) "30" 2) "88.71581602079999521" 3) "40" 4) "33.95183371870000115" 5) "35" ......
(4)编写 spark streaming 程序,依次读取 kafka 中 t_order 主题数据,并统计:不同年龄段消费总金额,并存入 redis,其中 key 为” age” ,value 为累计的消费金额
public class OrderStreamingAnalysis { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf().setAppName("OrderStreamingAnalysis"); // final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2)); //kafka中接收到的数据格式{"uid":"55792,1.4898961024,0"} 里面没有年龄,所以需要跟user表做join, //因为一个程序中只能有一个sparkcontext所以通过JavaStreamingContext 获得 final SQLContext sqlcontext = new SQLContext(ssc.sparkContext()); ssc.sparkContext().setLogLevel("WARN"); final Dataset<Row> userDs = sqlcontext.read().csv("hdfs://master:9000/warehouse/t_user"); //设置schema信息 StructType userSchema = new StructType() .add("uid", "string", false) .add("age", "string", false) .add("six", "string", true) .add("active_date", "string", false) .add("limit", "string", false); final Dataset<Row> userDf = sqlcontext.createDataFrame(userDs.toJavaRDD(), userSchema); // Kafka configurations String[] topics = KafkaRedisConfig.KAFKA_ORDER_TOPIC.split("\\,"); System.out.println("Topics: " + Arrays.toString(topics)); String brokers = KafkaRedisConfig.KAFKA_ADDR; Set<String> topicsSet = new HashSet<String>(Arrays.asList(topics)); Map<String, Object> kafkaParams = new HashMap<String, Object>(); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); //初始化dstream 这里使用spark-streaming-kafka-0-10_2.11中的kafkautil JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams) ); //读取到的string转化为json对象 JavaDStream<JSONObject> events = kafkaStream.map(new Function<ConsumerRecord<String, String>, JSONObject>() { public JSONObject call(ConsumerRecord<String, String> line) throws Exception { System.out.println("line:" + line.value()); return JSON.parseObject(line.value()); } }); //取出uid和金额 JavaPairDStream<String, Double> orderDs = events.mapToPair(new PairFunction<JSONObject, String, Double>() { public Tuple2<String, Double> call(JSONObject json) throws Exception { String[] strs = json.getString("uid").split(","); return new Tuple2<String, Double>(strs[0], Double.parseDouble(strs[1]) - Double.parseDouble(strs[2])); } }); orderDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() { public void call(JavaPairRDD<String, Double> rdd) throws Exception { JavaRDD<Row> mapRow = rdd.map(new Function<Tuple2<String, Double>, Row>() { public Row call(Tuple2<String, Double> v1) throws Exception { String uid = v1._1(); Double money = v1._2(); // System.out.println("orderUID:" + uid+":"+money); return RowFactory.create(uid, money); } }); StructType orderSchema = new StructType() .add("uid", "string", false) .add("money", "Double", false); Dataset<Row> orderDf = sqlcontext.createDataFrame(mapRow, orderSchema); //定义一个redis的hashkey final String moneyHashKey = "age::money"; //查询 Dataset<Row> count = orderDf.join(userDf, orderDf.col("uid").equalTo(userDf.col("uid"))) .select("age", "money") .groupBy("age") .sum("money"); count.printSchema(); count.repartition(3).foreachPartition(new ForeachPartitionFunction<Row>() { public void call(Iterator<Row> t) throws Exception { Jedis jedis = JavaRedisClient.get().getResource(); try { if(t.hasNext()){ Row row = t.next(); String age = row.getString(0); Double money = row.getDouble(1); System.out.println(age+"::::"+money); jedis.hincrByFloat(moneyHashKey, age, money); } }catch (Exception e){ System.out.println("error"+e); } jedis.close(); } }); } }); ssc.start(); ssc.awaitTermination(); }
运行结果redis
127.0.0.1:6379> HGETALL age::money 1) "30" 2) "107.51128448799999422" 3) "40" 4) "39.40304406300000115" 5) "35" 6) "83.02971674589999735" ......
OK完成
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 京东金融数据分析案例(一)
- Python数据分析案例—商圈客流量特征分析
- 以内部视角来观察10个数据分析的成功案例
- 数据分析案例:谁是2018当之无愧的“第一”国产电影?
- 鸢尾花分类决策树数据分析-大数据ML样本集案例实战
- 走出大数据分析误区 寄云多行业工业案例树标杆
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。