一、使用hive load hdfs上的清洗的数据。 4861 + 9666 --------------------------------------------- ---------------------- 1.动态添加表分区 $hive> alter table eshop.logs add partition(year=2018,month=11,day=25,hour=12,minute=51); 2.load数据到表中。 $hive> load data inpath '/data/eshop/cleaned/2018/11/25/12/58' into table eshop.logs partition(year=2018,month=11,day=25,hour=12,minute=51); 3.查询topN $hive> select * from logs ; //倒排序topN $hive> select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ; 4.创建统计结果表 $hive> create table stats(request string, c int) row format DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; $hive> insert into stats select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ; 5.Mysql中创建表 mysql> create table stats (id int primary key auto_increment,request varchar(200), c int); 5.使用sqoop将hive中的数据导出到mysql $>sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table stats --columns request,c --export-dir hdfs://s100/user/hive/warehouse/eshop.db/stats 6.将以上2-5部写成脚本,使用cron进行调度. a.描述 每天的凌晨2点整,统计昨天的日志。 b.创建bash脚本 1.创建准备脚本 -- 动态创建hivesql脚本文件[stat.ql]。 [/usr/local/bin/prestats.sh] #!/bin/bash y=`date +%Y` m=`date +%m` d=`date -d "-0 day" +%d` m=$(( m+0 )) d=$(( d+0 )) # 删除之前的hql文件 rm -rf stat.ql #添加分区 echo "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql #加载数据放到分区 echo "load data inpath 'hdfs://s201/user/centos/eshop/cleaned/${y}/${m}/${d}/9/28' into table eshop.logs partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql #统计数据,并将结果插入到stats表 echo "insert into eshop.stats select request,count(*) as c from eshop.logs where year = ${y} and month = ${m} and day = ${d} and hour=9 and minute = 28 group by request order by c desc ;" >> stat.ql 2.创建执行脚本 [/usr/local/bin/exestats.sh] #!/bin/bash # 创建hive脚本文件 ./prestats.sh #执行hive的ql脚本 hive -f stat.ql #执行sqoop导出到mysql sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --columns request,c --export-dir /user/hive/warehouse/eshop.db/stats #sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --export-dir /user/hive/warehouse/eshop.db/stats 3.修改所有权限 $>sudo chmod a+x /usr/local/bin/prestats.sh $>sudo chmod a+x /usr/local/bin/exestats.sh 7.编写 java 客户端进行以上步骤 a.在hive主机上启动hiveserver2 $> hiveserver2 & b.编写java客户端通过jdbc访问hive数据 1)新建HiveClient模块,添加maven支持 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>HiveClient</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> </dependencies> </project> 2)编写类进行查询和插入StatDao.java package com.test.hiveclient; import org.apache.hadoop.hbase.client.Result; import java.sql.*; import java.util.HashMap; import java.util.Map; public class StatDao { private static Map<String, Integer> map = new HashMap<String, Integer>(); public static void main(String [] args) { try { Class.forName("org.apache.hive.jdbc.HiveDriver"); //建立连接 Connection conn = DriverManager.getConnection("jdbc:hive2://192.168.43.131:10000/eshop","",""); System.out.println(conn); PreparedStatement ppst = conn.prepareStatement("select * from stats"); ResultSet set = ppst.executeQuery(); while (set.next()) { map.put(set.getString(1), set.getInt(2)); System.out.print(set.getString(1) + " : "); System.out.print(set.getInt(2)); System.out.println(); } getMysqlConn(); } catch (Exception e) { e.printStackTrace(); } } public static void getMysqlConn() { try { //加载类(加载驱动程序) Class.forName("com.mysql.jdbc.Driver"); //数据库连接url String url = "jdbc:mysql://192.168.43.1:3306/eshop" ; //username String user = "mysql"; //password String pass = "mysql" ; //得到连接 Connection conn = DriverManager.getConnection(url, user, pass); //创建语句对象 Statement st = conn.createStatement(); for(String key : map.keySet() ) { PreparedStatement pt = conn.prepareStatement("insert into stats (request,count) values(?,?)"); pt.setString(1, key); pt.setInt(2, map.get(key)); pt.executeUpdate(); } } catch (Exception e) { e.printStackTrace(); } } } 二、JFreeChart生成统计图表 ---------------------------------------------------------- 1.pom.xml <dependency> <groupId>jfree</groupId> <artifactId>jfreechart</artifactId> <version>1.0.13</version> </dependency> 2.使用JFreechart生成图片 package com.test.eshop.test; import org.jfree.chart.ChartFactory; import org.jfree.chart.ChartUtilities; import org.jfree.chart.JFreeChart; import org.jfree.chart.plot.PiePlot; import org.jfree.chart.plot.PiePlot3D; import org.jfree.data.general.DefaultPieDataset; import org.jfree.data.general.PieDataset; import org.junit.Test; import java.awt.*; import java.io.File; import java.io.IOException; /** * 测试饼图 */ public class TestJfreechart { @Test public void pie() throws Exception { File f = new File("d:/pie.png"); //数据集 DefaultPieDataset ds = new DefaultPieDataset(); ds.setValue("HuaWei",3000); ds.setValue("Apple",5000); ds.setValue("Mi",1890); JFreeChart chart = ChartFactory.createPieChart("饼图演示", ds, false, false, false); Font font = new Font("宋体",Font.BOLD,15); chart.getTitle().setFont(font); //背景透明 ((PiePlot)chart.getPlot()).setForegroundAlpha(0.2f); ((PiePlot)chart.getPlot()).setExplodePercent("Apple",0.1f); ((PiePlot)chart.getPlot()).setExplodePercent("HuaWei",0.2f); ((PiePlot)chart.getPlot()).setExplodePercent("Mi",0.3f); //创建3D饼图 ChartUtilities.saveChartAsJPEG(f, chart,400,300); } } 三、引入Spark推荐系统 -------------------------------------------------------------------- 1.设计用户商品表 -- Mysql create table useritems(id int primary key auto_increment ,userid int, itemid int, score int , time timestamp); 2.添加映射文件UserItem.hbm.xml 3.Dao + Service 4.controller 5.spark部分 a)通过sqoop到处 mysql 数据到hdfs $> sqoop import --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table useritems --columns userid,itemid,score -m 2 --target-dir /data/eshop/recommends --check-column id --incremental append --last-value 0 b)启动spark集群 c)启动spark-shell $> spark-shell --master spark://s100:7077 #内置SparkSession--spark $scala> import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS import spark.implicits._ case class UserItem(userId: Int, itemId: Int, score : Int); def parseRating(str: String): UserItem = { val fields = str.split(",") UserItem(fields(0).toInt, fields(1).toInt, fields(2).toInt) } val useritems = spark.read.textFile("hdfs://s100/data/eshop/recommends").map(parseRating).toDF() //val test = spark.read.textFile("hdfs://s100/data/eshop/testdata.txt").map(parseRating).toDF() val Array(training, test) = useritems.randomSplit(Array(0.8, 0.2)) val als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("itemId") .setRatingCol("score") val model = als.fit(training) val predictions = model.transform(test) val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("score").setPredictionCol("prediction") val rmse = evaluator.evaluate(predictions) println(s"Root-mean-square error = $rmse") //保存ALS模型 model.save("hdfs://s100/data/eshop/rec/model"); spark.stop() //加载模型 import org.apache.spark.ml.recommendation.ALSModel; val model = ALSModel.load("hdfs://s201/user/centos/eshop/rec/model"); val predictions = model.transform(test)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。