大数据项目实战之 --- 某购物平台商品实时推荐系统(五)

栏目: 服务器 · 发布时间: 7年前

一、使用hive load hdfs上的清洗的数据。 4861 + 9666
--------------------------------------------- ----------------------
   1.动态添加表分区
      $hive> alter table eshop.logs add partition(year=2018,month=11,day=25,hour=12,minute=51);

   2.load数据到表中。
      $hive> load data inpath '/data/eshop/cleaned/2018/11/25/12/58' into table eshop.logs  partition(year=2018,month=11,day=25,hour=12,minute=51);

   3.查询topN
      $hive> select * from logs ;
      //倒排序topN
      $hive> select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;

   4.创建统计结果表
      $hive> create table stats(request string, c int) row format DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
      $hive> insert into stats select request,count(*) as c from logs where year = 2018 and month = 11 and day = 25 group by request order by c desc ;

    5.Mysql中创建表
        mysql> create table stats (id int primary key auto_increment,request varchar(200), c int);

   5.使用sqoop将hive中的数据导出到mysql
      $>sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table stats --columns request,c --export-dir hdfs://s100/user/hive/warehouse/eshop.db/stats

   6.将以上2-5部写成脚本,使用cron进行调度.
      a.描述
         每天的凌晨2点整,统计昨天的日志。

      b.创建bash脚本
         1.创建准备脚本 -- 动态创建hivesql脚本文件[stat.ql]。
            [/usr/local/bin/prestats.sh]
            #!/bin/bash
            y=`date +%Y`
            m=`date +%m`
            d=`date -d "-0 day" +%d`

            m=$(( m+0 ))
            d=$(( d+0 ))
            # 删除之前的hql文件
            rm -rf stat.ql

            #添加分区
            echo "alter table eshop.logs add if not exists partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql

            #加载数据放到分区
            echo "load data inpath 'hdfs://s201/user/centos/eshop/cleaned/${y}/${m}/${d}/9/28' into table eshop.logs  partition(year=${y},month=${m},day=${d},hour=9,minute=28);" >> stat.ql

            #统计数据,并将结果插入到stats表
            echo "insert into eshop.stats select request,count(*) as c from eshop.logs where year = ${y} and month = ${m} and day = ${d} and hour=9 and minute = 28 group by request order by c desc ;" >> stat.ql

         2.创建执行脚本
            [/usr/local/bin/exestats.sh]
            #!/bin/bash
            # 创建hive脚本文件
            ./prestats.sh

            #执行hive的ql脚本
            hive -f stat.ql

            #执行sqoop导出到mysql
            sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop  --username mysql --password mysql --table stats --columns request,c --export-dir /user/hive/warehouse/eshop.db/stats
            #sqoop export --connect jdbc:mysql://192.168.43.1:3306/eshop  --username mysql --password mysql --table stats --export-dir /user/hive/warehouse/eshop.db/stats

         3.修改所有权限
            $>sudo chmod a+x /usr/local/bin/prestats.sh
            $>sudo chmod a+x /usr/local/bin/exestats.sh

    7.编写 java 客户端进行以上步骤
        a.在hive主机上启动hiveserver2
            $> hiveserver2 &

        b.编写java客户端通过jdbc访问hive数据
            1)新建HiveClient模块,添加maven支持
                <?xml version="1.0" encoding="UTF-8"?>
                <project xmlns="http://maven.apache.org/POM/4.0.0"
                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                    <modelVersion>4.0.0</modelVersion>

                    <groupId>com.test</groupId>
                    <artifactId>HiveClient</artifactId>
                    <version>1.0-SNAPSHOT</version>

                    <dependencies>

                        <dependency>
                            <groupId>org.apache.hive</groupId>
                            <artifactId>hive-jdbc</artifactId>
                            <version>2.1.0</version>
                        </dependency>

                        <dependency>
                            <groupId>mysql</groupId>
                            <artifactId>mysql-connector-java</artifactId>
                            <version>5.1.17</version>
                        </dependency>

                    </dependencies>


                </project>
            2)编写类进行查询和插入StatDao.java
                package com.test.hiveclient;

                import org.apache.hadoop.hbase.client.Result;

                import java.sql.*;
                import java.util.HashMap;
                import java.util.Map;

                public class StatDao {

                    private static Map<String, Integer> map = new HashMap<String, Integer>();

                    public static void main(String [] args)
                    {
                        try {
                            Class.forName("org.apache.hive.jdbc.HiveDriver");
                            //建立连接
                            Connection conn = DriverManager.getConnection("jdbc:hive2://192.168.43.131:10000/eshop","","");
                            System.out.println(conn);
                            PreparedStatement ppst = conn.prepareStatement("select * from stats");
                            ResultSet set = ppst.executeQuery();
                            while (set.next()) {
                                map.put(set.getString(1), set.getInt(2));
                                System.out.print(set.getString(1) + " : ");
                                System.out.print(set.getInt(2));
                                System.out.println();
                            }


                            getMysqlConn();


                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }


                    public static void getMysqlConn()
                    {
                        try {
                            //加载类(加载驱动程序)
                            Class.forName("com.mysql.jdbc.Driver");
                            //数据库连接url
                            String url = "jdbc:mysql://192.168.43.1:3306/eshop" ;
                            //username
                            String user = "mysql";
                            //password
                            String pass = "mysql" ;

                            //得到连接
                            Connection conn = DriverManager.getConnection(url, user, pass);
                            //创建语句对象
                            Statement st = conn.createStatement();
                            for(String key : map.keySet() )
                            {
                                PreparedStatement pt =  conn.prepareStatement("insert into stats (request,count) values(?,?)");
                                pt.setString(1, key);
                                pt.setInt(2, map.get(key));
                                pt.executeUpdate();
                            }

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }


二、JFreeChart生成统计图表
----------------------------------------------------------
    1.pom.xml
      <dependency>
          <groupId>jfree</groupId>
          <artifactId>jfreechart</artifactId>
          <version>1.0.13</version>
      </dependency>

    2.使用JFreechart生成图片
        package com.test.eshop.test;

        import org.jfree.chart.ChartFactory;
        import org.jfree.chart.ChartUtilities;
        import org.jfree.chart.JFreeChart;
        import org.jfree.chart.plot.PiePlot;
        import org.jfree.chart.plot.PiePlot3D;
        import org.jfree.data.general.DefaultPieDataset;
        import org.jfree.data.general.PieDataset;
        import org.junit.Test;

        import java.awt.*;
        import java.io.File;
        import java.io.IOException;

        /**
         * 测试饼图
         */
        public class TestJfreechart {

            @Test
            public void pie() throws Exception {
                File f = new File("d:/pie.png");

                //数据集
                DefaultPieDataset ds = new DefaultPieDataset();
                ds.setValue("HuaWei",3000);
                ds.setValue("Apple",5000);
                ds.setValue("Mi",1890);

                JFreeChart chart = ChartFactory.createPieChart("饼图演示", ds, false, false, false);

                Font font = new Font("宋体",Font.BOLD,15);
                chart.getTitle().setFont(font);
                //背景透明

                ((PiePlot)chart.getPlot()).setForegroundAlpha(0.2f);
                ((PiePlot)chart.getPlot()).setExplodePercent("Apple",0.1f);
                ((PiePlot)chart.getPlot()).setExplodePercent("HuaWei",0.2f);
                ((PiePlot)chart.getPlot()).setExplodePercent("Mi",0.3f);


                //创建3D饼图
                ChartUtilities.saveChartAsJPEG(f, chart,400,300);
            }
        }


三、引入Spark推荐系统
--------------------------------------------------------------------
    1.设计用户商品表 -- Mysql
        create table useritems(id int primary key auto_increment ,userid int, itemid int, score int , time timestamp);

    2.添加映射文件UserItem.hbm.xml

    3.Dao + Service

    4.controller

    5.spark部分
      a)通过sqoop到处 mysql 数据到hdfs
         $> sqoop import --connect jdbc:mysql://192.168.43.1:3306/eshop --driver com.mysql.jdbc.Driver --username mysql --password mysql --table useritems --columns userid,itemid,score -m 2 --target-dir /data/eshop/recommends --check-column id --incremental append --last-value 0

      b)启动spark集群

      c)启动spark-shell
         $> spark-shell --master spark://s100:7077

         #内置SparkSession--spark
         $scala>
            import org.apache.spark.ml.evaluation.RegressionEvaluator
            import org.apache.spark.ml.recommendation.ALS
            import spark.implicits._

            case class UserItem(userId: Int, itemId: Int, score : Int);

            def parseRating(str: String): UserItem = {
            val fields = str.split(",")
            UserItem(fields(0).toInt, fields(1).toInt, fields(2).toInt)
            }

            val useritems = spark.read.textFile("hdfs://s100/data/eshop/recommends").map(parseRating).toDF()

            //val test = spark.read.textFile("hdfs://s100/data/eshop/testdata.txt").map(parseRating).toDF()
            val Array(training, test) = useritems.randomSplit(Array(0.8, 0.2))

            val als = new ALS()
            .setMaxIter(5)
            .setRegParam(0.01)
            .setUserCol("userId")
            .setItemCol("itemId")
            .setRatingCol("score")
            val model = als.fit(training)

            val predictions = model.transform(test)

            val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("score").setPredictionCol("prediction")
            val rmse = evaluator.evaluate(predictions)
            println(s"Root-mean-square error = $rmse")


            //保存ALS模型
            model.save("hdfs://s100/data/eshop/rec/model");
            spark.stop()


            //加载模型
            import org.apache.spark.ml.recommendation.ALSModel;
            val model = ALSModel.load("hdfs://s201/user/centos/eshop/rec/model");
            val predictions = model.transform(test)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Automate This

Automate This

Christopher Steiner / Portfolio / 2013-8-9 / USD 25.95

"The rousing story of the last gasp of human agency and how today's best and brightest minds are endeavoring to put an end to it." It used to be that to diagnose an illness, interpret legal docume......一起来看看 《Automate This》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具