京东金融数据分析案例(二)

栏目: 数据库 · 发布时间: 6年前

内容简介:版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827

接着上一篇 京东金融数据分析案例(一)

任务 5

利用 spark streaming 实时分析每个页面点击次数和不同年龄段消费总金额

步骤:编写 Kafka produer 程序读取hdfs上的文件每隔一段时间产生数据,然后使用spark streaming读取kafka中的数据进行分析,分析结果写入到 redis 中。

(1)将 t_click 数据依次写入 kafka 中的 t_click 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,click_time+” ,” +pid 为 value

public class ClickProducer {
    private static KafkaProducer<String,String> producer;
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("producer.ClickProducer");

        JavaSparkContext sc = new JavaSparkContext(conf);
        //从hdfs上读取数据
        JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_click");

        //kafka的broker的地址(localhost:9092)这里设置常量
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Map<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String,String>(props);

        //循环读取到的数据,每隔十秒发送一条
        input.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                String[] split = s.split(",");
                JSONObject event = new JSONObject();
                event.put("uid",split[1]+","+split[2]);
                ProducerRecord<String, String> msg =
                        new ProducerRecord<String, String>(KafkaRedisConfig.KAFKA_CLICK_TOPIC, event.toString());
                producer.send(msg);

                System.out.println("Message sent: " + event);

                Thread.sleep(1000);
            }
        });


    }
}

运行结果

```
{"uid":"2016-10-04 12:22:30,1"}
{"uid":"2016-08-09 11:24:13,9"}
{"uid":"2016-09-27 14:40:37,10"}
{"uid":"2016-10-04 12:18:42,6"}
......

(2)将 t_order 数据依次写入 kafka 中的 t_order 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,uid+” ,” +price + “,” + discount 为value

public class OrderProducer {
    private static KafkaProducer<String,String> producer;
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("producer.OrderProducer");

        JavaSparkContext sc = new JavaSparkContext(conf);
         //从hdfs上读取数据
        JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_order");
        //t_order 主题 使用常量定义
        final String topic = KafkaRedisConfig.KAFKA_ORDER_TOPIC;
         //kafka的broker的地址(localhost:9092)这里设置常量
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Map<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String,String>(props);
        //循环读取到的数据,每隔十秒发送一条
        input.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                String[] split = s.split(",");
                JSONObject event = new JSONObject();
                if(!split[0].contains("uid")){
                    event.put("uid",split[0]+","+split[2]+","+split[5]);
                    ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, event.toString());
                    producer.send(msg);
                    System.out.println("Message sent: " + event);
                }

                Thread.sleep(2000);
            }
        });
    }
}

运行结果

{"uid":"55792,1.4898961024,0"}
{"uid":"45370,3.9950093311,0"}
{"uid":"85278,0.6658123361,0"}
......

(3)编写 spark streaming 程序,依次读取 kafka 中 t_click 主题数据,并统计: 每个页面累计点击次数,并存入 redis,其中 key 为” click+pid” ,value 为累计的次数

public class ClickStreamingAnalysis {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("ClickStreamingAnalysis");

        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
        ssc.sparkContext().setLogLevel("WARN");
        // Kafka configurations
        String[] topics = KafkaRedisConfig.KAFKA_CLICK_TOPIC.split("\\,");
        System.out.println("Topics: " + Arrays.toString(topics));
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder");

        // Create a direct stream 这里使用spark-streaming-kafka-0-8_2.11中的kafkautil
        JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, String.class,
                StringDecoder.class, StringDecoder.class,
                kafkaParams,
                new HashSet<String>(Arrays.asList(topics)));
        //接收到的数据格式为{"uid":"2016-10-04 12:22:30,1"} 创建json对象
        JavaDStream<JSONObject> events = kafkaStream.map(new Function<Tuple2<String, String>, JSONObject>() {
            public JSONObject call(Tuple2<String, String> line) throws Exception {
                System.out.println("line:" + line._2());
                JSONObject data = JSON.parseObject(line._2());
                return data;
            }
        });
        //取出pid 并map成(pid,1)的格式,然后聚合即可算出此批次该pid的点击次数
        JavaPairDStream<String, Long> clickDs = events.mapToPair(new PairFunction<JSONObject, String, Long>() {
            public Tuple2<String, Long> call(JSONObject json) throws Exception {
//                System.out.println("clickUID:" + json.getString("uid"));
                return new Tuple2<String, Long>(json.getString("uid").split(",")[1], 1L);
            }
        }).reduceByKey(new Function2<Long, Long, Long>() {
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
        //定义一个redis的hashkey
        final String clickHashKey = "pid::click";
        clickDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
            public void call(JavaPairRDD<String, Long> rdd) throws Exception {
                rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
                    public void call(Iterator<Tuple2<String, Long>> tuple2Iterator) throws Exception {                       //在foreachPartition中创建jdeis的连接可以减少连接
                        Jedis jedis = JavaRedisClient.get().getResource();
                        try{
                            while (tuple2Iterator.hasNext()){
                                Tuple2<String, Long> next = tuple2Iterator.next();
                                String pid = "click"+next._1();
                                Long clickCount = next._2();
                                jedis.hincrBy(clickHashKey, pid, clickCount);
                                System.out.println(pid+":"+clickCount);
                            }
                        }catch (Exception e){
                            System.out.println("error:"+e);
                        }
                        //用完一定要关了,不然连接池泄露程序就卡主了
                        jedis.close();
                    }
                });
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }
}

运行结果redis

127.0.0.1:6379> HGETALL age::money
 1) "30"
 2) "88.71581602079999521"
 3) "40"
 4) "33.95183371870000115"
 5) "35"
 ......

(4)编写 spark streaming 程序,依次读取 kafka 中 t_order 主题数据,并统计:不同年龄段消费总金额,并存入 redis,其中 key 为” age” ,value 为累计的消费金额

public class OrderStreamingAnalysis {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("OrderStreamingAnalysis");
//        final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
        //kafka中接收到的数据格式{"uid":"55792,1.4898961024,0"} 里面没有年龄,所以需要跟user表做join,
        //因为一个程序中只能有一个sparkcontext所以通过JavaStreamingContext 获得
        final SQLContext sqlcontext = new SQLContext(ssc.sparkContext());

        ssc.sparkContext().setLogLevel("WARN");

        final Dataset<Row> userDs = sqlcontext.read().csv("hdfs://master:9000/warehouse/t_user");
        //设置schema信息
        StructType userSchema = new StructType()
                .add("uid", "string", false)
                .add("age", "string", false)
                .add("six", "string", true)
                .add("active_date", "string", false)
                .add("limit", "string", false);
        final Dataset<Row> userDf = sqlcontext.createDataFrame(userDs.toJavaRDD(), userSchema);


        // Kafka configurations
        String[] topics = KafkaRedisConfig.KAFKA_ORDER_TOPIC.split("\\,");
        System.out.println("Topics: " + Arrays.toString(topics));
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Set<String> topicsSet = new HashSet<String>(Arrays.asList(topics));
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        //初始化dstream 这里使用spark-streaming-kafka-0-10_2.11中的kafkautil
        JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams)
                );
        //读取到的string转化为json对象
        JavaDStream<JSONObject> events = kafkaStream.map(new Function<ConsumerRecord<String, String>, JSONObject>() {
            public JSONObject call(ConsumerRecord<String, String> line) throws Exception {
                System.out.println("line:" + line.value());
                return JSON.parseObject(line.value());
            }
        });
        //取出uid和金额
        JavaPairDStream<String, Double> orderDs = events.mapToPair(new PairFunction<JSONObject, String, Double>() {
            public Tuple2<String, Double> call(JSONObject json) throws Exception {
                String[] strs = json.getString("uid").split(",");
                return new Tuple2<String, Double>(strs[0], Double.parseDouble(strs[1]) - Double.parseDouble(strs[2]));
            }
        });
        orderDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
            public void call(JavaPairRDD<String, Double> rdd) throws Exception {
                JavaRDD<Row> mapRow = rdd.map(new Function<Tuple2<String, Double>, Row>() {
                    public Row call(Tuple2<String, Double> v1) throws Exception {
                        String uid = v1._1();
                        Double money = v1._2();
//                        System.out.println("orderUID:" + uid+":"+money);
                        return RowFactory.create(uid, money);
                    }
                });
                StructType orderSchema = new StructType()
                        .add("uid", "string", false)
                        .add("money", "Double", false);
                Dataset<Row> orderDf = sqlcontext.createDataFrame(mapRow, orderSchema);
              //定义一个redis的hashkey
                final String moneyHashKey = "age::money";
              //查询
                Dataset<Row> count = orderDf.join(userDf, orderDf.col("uid").equalTo(userDf.col("uid")))
                        .select("age", "money")
                        .groupBy("age")
                        .sum("money");

                count.printSchema();
                count.repartition(3).foreachPartition(new ForeachPartitionFunction<Row>() {
                    public void call(Iterator<Row> t) throws Exception {
                        Jedis jedis = JavaRedisClient.get().getResource();
                        try {
                            if(t.hasNext()){
                                Row row = t.next();
                                String age = row.getString(0);
                                Double money = row.getDouble(1);
                                System.out.println(age+"::::"+money);
                                jedis.hincrByFloat(moneyHashKey, age, money);
                            }
                        }catch (Exception e){
                            System.out.println("error"+e);
                        }
                        jedis.close();
                    }
                });          
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }

运行结果redis

127.0.0.1:6379> HGETALL age::money
 1) "30"
 2) "107.51128448799999422"
 3) "40"
 4) "39.40304406300000115"
 5) "35"
 6) "83.02971674589999735"
 ......

OK完成


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

On LISP

On LISP

Paul Graham / Prentice Hall / 09 September, 1993 / $52.00

On Lisp is a comprehensive study of advanced Lisp techniques, with bottom-up programming as the unifying theme. It gives the first complete description of macros and macro applications. The book also ......一起来看看 《On LISP》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具