内容简介:版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827
接着上一篇 京东金融数据分析案例(一) 来
任务 5
利用 spark streaming 实时分析每个页面点击次数和不同年龄段消费总金额
步骤:编写 Kafka produer 程序读取hdfs上的文件每隔一段时间产生数据,然后使用spark streaming读取kafka中的数据进行分析,分析结果写入到 redis 中。
(1)将 t_click 数据依次写入 kafka 中的 t_click 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,click_time+” ,” +pid 为 value
public class ClickProducer {
private static KafkaProducer<String,String> producer;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("producer.ClickProducer");
JavaSparkContext sc = new JavaSparkContext(conf);
//从hdfs上读取数据
JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_click");
//kafka的broker的地址(localhost:9092)这里设置常量
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Map<String, Object> props = new HashMap<String, Object>();
props.put("bootstrap.servers", brokers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(props);
//循环读取到的数据,每隔十秒发送一条
input.foreach(new VoidFunction<String>() {
public void call(String s) throws Exception {
String[] split = s.split(",");
JSONObject event = new JSONObject();
event.put("uid",split[1]+","+split[2]);
ProducerRecord<String, String> msg =
new ProducerRecord<String, String>(KafkaRedisConfig.KAFKA_CLICK_TOPIC, event.toString());
producer.send(msg);
System.out.println("Message sent: " + event);
Thread.sleep(1000);
}
});
}
}
运行结果
```
{"uid":"2016-10-04 12:22:30,1"}
{"uid":"2016-08-09 11:24:13,9"}
{"uid":"2016-09-27 14:40:37,10"}
{"uid":"2016-10-04 12:18:42,6"}
......
(2)将 t_order 数据依次写入 kafka 中的 t_order 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,uid+” ,” +price + “,” + discount 为value
public class OrderProducer {
private static KafkaProducer<String,String> producer;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("producer.OrderProducer");
JavaSparkContext sc = new JavaSparkContext(conf);
//从hdfs上读取数据
JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_order");
//t_order 主题 使用常量定义
final String topic = KafkaRedisConfig.KAFKA_ORDER_TOPIC;
//kafka的broker的地址(localhost:9092)这里设置常量
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Map<String, Object> props = new HashMap<String, Object>();
props.put("bootstrap.servers", brokers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(props);
//循环读取到的数据,每隔十秒发送一条
input.foreach(new VoidFunction<String>() {
public void call(String s) throws Exception {
String[] split = s.split(",");
JSONObject event = new JSONObject();
if(!split[0].contains("uid")){
event.put("uid",split[0]+","+split[2]+","+split[5]);
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, event.toString());
producer.send(msg);
System.out.println("Message sent: " + event);
}
Thread.sleep(2000);
}
});
}
}
运行结果
{"uid":"55792,1.4898961024,0"}
{"uid":"45370,3.9950093311,0"}
{"uid":"85278,0.6658123361,0"}
......
(3)编写 spark streaming 程序,依次读取 kafka 中 t_click 主题数据,并统计: 每个页面累计点击次数,并存入 redis,其中 key 为” click+pid” ,value 为累计的次数
public class ClickStreamingAnalysis {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("ClickStreamingAnalysis");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLogLevel("WARN");
// Kafka configurations
String[] topics = KafkaRedisConfig.KAFKA_CLICK_TOPIC.split("\\,");
System.out.println("Topics: " + Arrays.toString(topics));
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder");
// Create a direct stream 这里使用spark-streaming-kafka-0-8_2.11中的kafkautil
JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafkaParams,
new HashSet<String>(Arrays.asList(topics)));
//接收到的数据格式为{"uid":"2016-10-04 12:22:30,1"} 创建json对象
JavaDStream<JSONObject> events = kafkaStream.map(new Function<Tuple2<String, String>, JSONObject>() {
public JSONObject call(Tuple2<String, String> line) throws Exception {
System.out.println("line:" + line._2());
JSONObject data = JSON.parseObject(line._2());
return data;
}
});
//取出pid 并map成(pid,1)的格式,然后聚合即可算出此批次该pid的点击次数
JavaPairDStream<String, Long> clickDs = events.mapToPair(new PairFunction<JSONObject, String, Long>() {
public Tuple2<String, Long> call(JSONObject json) throws Exception {
// System.out.println("clickUID:" + json.getString("uid"));
return new Tuple2<String, Long>(json.getString("uid").split(",")[1], 1L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
//定义一个redis的hashkey
final String clickHashKey = "pid::click";
clickDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
public void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
public void call(Iterator<Tuple2<String, Long>> tuple2Iterator) throws Exception { //在foreachPartition中创建jdeis的连接可以减少连接
Jedis jedis = JavaRedisClient.get().getResource();
try{
while (tuple2Iterator.hasNext()){
Tuple2<String, Long> next = tuple2Iterator.next();
String pid = "click"+next._1();
Long clickCount = next._2();
jedis.hincrBy(clickHashKey, pid, clickCount);
System.out.println(pid+":"+clickCount);
}
}catch (Exception e){
System.out.println("error:"+e);
}
//用完一定要关了,不然连接池泄露程序就卡主了
jedis.close();
}
});
}
});
ssc.start();
ssc.awaitTermination();
}
}
运行结果redis
127.0.0.1:6379> HGETALL age::money 1) "30" 2) "88.71581602079999521" 3) "40" 4) "33.95183371870000115" 5) "35" ......
(4)编写 spark streaming 程序,依次读取 kafka 中 t_order 主题数据,并统计:不同年龄段消费总金额,并存入 redis,其中 key 为” age” ,value 为累计的消费金额
public class OrderStreamingAnalysis {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("OrderStreamingAnalysis");
// final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
//kafka中接收到的数据格式{"uid":"55792,1.4898961024,0"} 里面没有年龄,所以需要跟user表做join,
//因为一个程序中只能有一个sparkcontext所以通过JavaStreamingContext 获得
final SQLContext sqlcontext = new SQLContext(ssc.sparkContext());
ssc.sparkContext().setLogLevel("WARN");
final Dataset<Row> userDs = sqlcontext.read().csv("hdfs://master:9000/warehouse/t_user");
//设置schema信息
StructType userSchema = new StructType()
.add("uid", "string", false)
.add("age", "string", false)
.add("six", "string", true)
.add("active_date", "string", false)
.add("limit", "string", false);
final Dataset<Row> userDf = sqlcontext.createDataFrame(userDs.toJavaRDD(), userSchema);
// Kafka configurations
String[] topics = KafkaRedisConfig.KAFKA_ORDER_TOPIC.split("\\,");
System.out.println("Topics: " + Arrays.toString(topics));
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Set<String> topicsSet = new HashSet<String>(Arrays.asList(topics));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
//初始化dstream 这里使用spark-streaming-kafka-0-10_2.11中的kafkautil
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams)
);
//读取到的string转化为json对象
JavaDStream<JSONObject> events = kafkaStream.map(new Function<ConsumerRecord<String, String>, JSONObject>() {
public JSONObject call(ConsumerRecord<String, String> line) throws Exception {
System.out.println("line:" + line.value());
return JSON.parseObject(line.value());
}
});
//取出uid和金额
JavaPairDStream<String, Double> orderDs = events.mapToPair(new PairFunction<JSONObject, String, Double>() {
public Tuple2<String, Double> call(JSONObject json) throws Exception {
String[] strs = json.getString("uid").split(",");
return new Tuple2<String, Double>(strs[0], Double.parseDouble(strs[1]) - Double.parseDouble(strs[2]));
}
});
orderDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
public void call(JavaPairRDD<String, Double> rdd) throws Exception {
JavaRDD<Row> mapRow = rdd.map(new Function<Tuple2<String, Double>, Row>() {
public Row call(Tuple2<String, Double> v1) throws Exception {
String uid = v1._1();
Double money = v1._2();
// System.out.println("orderUID:" + uid+":"+money);
return RowFactory.create(uid, money);
}
});
StructType orderSchema = new StructType()
.add("uid", "string", false)
.add("money", "Double", false);
Dataset<Row> orderDf = sqlcontext.createDataFrame(mapRow, orderSchema);
//定义一个redis的hashkey
final String moneyHashKey = "age::money";
//查询
Dataset<Row> count = orderDf.join(userDf, orderDf.col("uid").equalTo(userDf.col("uid")))
.select("age", "money")
.groupBy("age")
.sum("money");
count.printSchema();
count.repartition(3).foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> t) throws Exception {
Jedis jedis = JavaRedisClient.get().getResource();
try {
if(t.hasNext()){
Row row = t.next();
String age = row.getString(0);
Double money = row.getDouble(1);
System.out.println(age+"::::"+money);
jedis.hincrByFloat(moneyHashKey, age, money);
}
}catch (Exception e){
System.out.println("error"+e);
}
jedis.close();
}
});
}
});
ssc.start();
ssc.awaitTermination();
}
运行结果redis
127.0.0.1:6379> HGETALL age::money 1) "30" 2) "107.51128448799999422" 3) "40" 4) "39.40304406300000115" 5) "35" 6) "83.02971674589999735" ......
OK完成
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 京东金融数据分析案例(一)
- Python数据分析案例—商圈客流量特征分析
- 以内部视角来观察10个数据分析的成功案例
- 数据分析案例:谁是2018当之无愧的“第一”国产电影?
- 鸢尾花分类决策树数据分析-大数据ML样本集案例实战
- 走出大数据分析误区 寄云多行业工业案例树标杆
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Clean Architecture
Robert C. Martin / Prentice Hall / 2017-9-20 / USD 34.99
Practical Software Architecture Solutions from the Legendary Robert C. Martin (“Uncle Bob”) By applying universal rules of software architecture, you can dramatically improve developer producti......一起来看看 《Clean Architecture》 这本书的介绍吧!
JS 压缩/解压工具
在线压缩/解压 JS 代码
图片转BASE64编码
在线图片转Base64编码工具