一、使用hive load hdfs上的清洗的数据。 4861 + 9666
--------------------------------------------- ----------------------
1.动态添加表分区
$hive> alter table eshop.logs add partition(year=2018,month=11,day=25,hour=12,minute=51);
2.load数据到表中。
$hive> load data inpath '/data/eshop/cleaned/2018/11/25/12/58' into table eshop.logs partition(year=2018,month=11,day=25,hour=12,minute=51);
3.查询topN
$hive> select * from logs ;
//倒排序topN
$hive> select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;
4.创建统计结果表
$hive> create table stats(request string, c int) row format DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
$hive> insert into stats select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;
5.Mysql中创建表
mysql> create table stats (id int primary key auto_increment,request varchar(200), c int);
5.使用sqoop将hive中的数据导出到mysql
$>sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table stats --columns request,c --export-dir hdfs://s100/user/hive/warehouse/eshop.db/stats
6.将以上2-5部写成脚本,使用cron进行调度.
a.描述
每天的凌晨2点整,统计昨天的日志。
b.创建bash脚本
1.创建准备脚本 -- 动态创建hivesql脚本文件[stat.ql]。
[/usr/local/bin/prestats.sh]
#!/bin/bash
y=`date +%Y`
m=`date +%m`
d=`date -d "-0 day" +%d`
m=$(( m+0 ))
d=$(( d+0 ))
# 删除之前的hql文件
rm -rf stat.ql
#添加分区
echo "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql
#加载数据放到分区
echo "load data inpath 'hdfs://s201/user/centos/eshop/cleaned/${y}/${m}/${d}/9/28' into table eshop.logs partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql
#统计数据,并将结果插入到stats表
echo "insert into eshop.stats select request,count(*) as c from eshop.logs where year = ${y} and month = ${m} and day = ${d} and hour=9 and minute = 28 group by request order by c desc ;" >> stat.ql
2.创建执行脚本
[/usr/local/bin/exestats.sh]
#!/bin/bash
# 创建hive脚本文件
./prestats.sh
#执行hive的ql脚本
hive -f stat.ql
#执行sqoop导出到mysql
sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --columns request,c --export-dir /user/hive/warehouse/eshop.db/stats
#sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --username mysql --password mysql --table stats --export-dir /user/hive/warehouse/eshop.db/stats
3.修改所有权限
$>sudo chmod a+x /usr/local/bin/prestats.sh
$>sudo chmod a+x /usr/local/bin/exestats.sh
7.编写 java 客户端进行以上步骤
a.在hive主机上启动hiveserver2
$> hiveserver2 &
b.编写java客户端通过jdbc访问hive数据
1)新建HiveClient模块,添加maven支持
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>HiveClient</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.17</version>
</dependency>
</dependencies>
</project>
2)编写类进行查询和插入StatDao.java
package com.test.hiveclient;
import org.apache.hadoop.hbase.client.Result;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
public class StatDao {
private static Map<String, Integer> map = new HashMap<String, Integer>();
public static void main(String [] args)
{
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
//建立连接
Connection conn = DriverManager.getConnection("jdbc:hive2://192.168.43.131:10000/eshop","","");
System.out.println(conn);
PreparedStatement ppst = conn.prepareStatement("select * from stats");
ResultSet set = ppst.executeQuery();
while (set.next()) {
map.put(set.getString(1), set.getInt(2));
System.out.print(set.getString(1) + " : ");
System.out.print(set.getInt(2));
System.out.println();
}
getMysqlConn();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void getMysqlConn()
{
try {
//加载类(加载驱动程序)
Class.forName("com.mysql.jdbc.Driver");
//数据库连接url
String url = "jdbc:mysql://192.168.43.1:3306/eshop" ;
//username
String user = "mysql";
//password
String pass = "mysql" ;
//得到连接
Connection conn = DriverManager.getConnection(url, user, pass);
//创建语句对象
Statement st = conn.createStatement();
for(String key : map.keySet() )
{
PreparedStatement pt = conn.prepareStatement("insert into stats (request,count) values(?,?)");
pt.setString(1, key);
pt.setInt(2, map.get(key));
pt.executeUpdate();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
二、JFreeChart生成统计图表
----------------------------------------------------------
1.pom.xml
<dependency>
<groupId>jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>1.0.13</version>
</dependency>
2.使用JFreechart生成图片
package com.test.eshop.test;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartUtilities;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.plot.PiePlot;
import org.jfree.chart.plot.PiePlot3D;
import org.jfree.data.general.DefaultPieDataset;
import org.jfree.data.general.PieDataset;
import org.junit.Test;
import java.awt.*;
import java.io.File;
import java.io.IOException;
/**
* 测试饼图
*/
public class TestJfreechart {
@Test
public void pie() throws Exception {
File f = new File("d:/pie.png");
//数据集
DefaultPieDataset ds = new DefaultPieDataset();
ds.setValue("HuaWei",3000);
ds.setValue("Apple",5000);
ds.setValue("Mi",1890);
JFreeChart chart = ChartFactory.createPieChart("饼图演示", ds, false, false, false);
Font font = new Font("宋体",Font.BOLD,15);
chart.getTitle().setFont(font);
//背景透明
((PiePlot)chart.getPlot()).setForegroundAlpha(0.2f);
((PiePlot)chart.getPlot()).setExplodePercent("Apple",0.1f);
((PiePlot)chart.getPlot()).setExplodePercent("HuaWei",0.2f);
((PiePlot)chart.getPlot()).setExplodePercent("Mi",0.3f);
//创建3D饼图
ChartUtilities.saveChartAsJPEG(f, chart,400,300);
}
}
三、引入Spark推荐系统
--------------------------------------------------------------------
1.设计用户商品表 -- Mysql
create table useritems(id int primary key auto_increment ,userid int, itemid int, score int , time timestamp);
2.添加映射文件UserItem.hbm.xml
3.Dao + Service
4.controller
5.spark部分
a)通过sqoop到处 mysql 数据到hdfs
$> sqoop import --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table useritems --columns userid,itemid,score -m 2 --target-dir /data/eshop/recommends --check-column id --incremental append --last-value 0
b)启动spark集群
c)启动spark-shell
$> spark-shell --master spark://s100:7077
#内置SparkSession--spark
$scala>
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import spark.implicits._
case class UserItem(userId: Int, itemId: Int, score : Int);
def parseRating(str: String): UserItem = {
val fields = str.split(",")
UserItem(fields(0).toInt, fields(1).toInt, fields(2).toInt)
}
val useritems = spark.read.textFile("hdfs://s100/data/eshop/recommends").map(parseRating).toDF()
//val test = spark.read.textFile("hdfs://s100/data/eshop/testdata.txt").map(parseRating).toDF()
val Array(training, test) = useritems.randomSplit(Array(0.8, 0.2))
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("itemId")
.setRatingCol("score")
val model = als.fit(training)
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("score").setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
//保存ALS模型
model.save("hdfs://s100/data/eshop/rec/model");
spark.stop()
//加载模型
import org.apache.spark.ml.recommendation.ALSModel;
val model = ALSModel.load("hdfs://s201/user/centos/eshop/rec/model");
val predictions = model.transform(test)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Automate This
Christopher Steiner / Portfolio / 2013-8-9 / USD 25.95
"The rousing story of the last gasp of human agency and how today's best and brightest minds are endeavoring to put an end to it." It used to be that to diagnose an illness, interpret legal docume......一起来看看 《Automate This》 这本书的介绍吧!