3-hive、sqoop

1、HIVE

  1、交互命令

        use db_name;

        create database db_name             //创建数据库
        create database if not exists db_name          //创建不存在的数据库

        create database db_name location '/database/'    //指定位置上创建数据库

        show databases;
        show databases like 'f.*';                //模糊查询

        drop database db_name
        drop database db_name cascade;            //当数据库中有表,必须使用cascade

  2、Hive操作

   ①数据库

          use db_name;

          create database db_name             //创建数据库
          create database if not exists db_name          //创建不存在的数据库

          create database db_name location '/database/'    //指定位置上创建数据库

          show databases;
          show databases like 'f.*';                //模糊查询

          drop database db_name
          drop database db_name cascade;            //当数据库中有表,必须使用cascade

   ②数据表

        、管理表:在删除表的时候,会同时把表中对应的数据删除;
        、外部表:在删除表的时候不会删除表中对应的数据,而只会删除表中的元数据信息;

        、分区表:Hive的分区就是分目录,把一个大的数据集拆分成一个个小的数据集,独立保存在不同的我文件夹中。
               在查询的时候可以通过where子句进行查询,提高查询效率。        

        、分桶:将一个文件分成几个小文件(基本没啥用)。
        ————————————————————————————————————————————————————————————
        、管理表(内部表),外部表
        ————————————————————————————————————————————————————————————
            create  table t1(id int,name string) row format delimited fields terminated by '\t';            //创建一个分隔符为\t的管理表
            create external table t1(id int,name string) row format delimited fields terminated by '\t';        //创建一个分隔符为\t的外部表
        ————————————————————————————————————————————————————————————
        、分区表
        ————————————————————————————————————————————————————————————
             create table t1(id int ,name string) partitioned by (day string) row format delimited fields terminated by '\t';

             create table t4(
                id int ,
                name string
            ) partitioned by (month string,day string)
            row format delimited fields terminated by '\t';                    //创建二级分区

            ';
            ';                    //查询二级分区内容

            show partitions t1;                                //显示表t1的分区

            ①分区类型:
                静态分区:加载数据时指定分区的值

                    load data local inpath '/home/1.txt' into table t1 partition (sex='woman');

                动态分区:数据未知,需要根据分区的值来确定需要创建的分区

                    insert overwrite table t1 partition(sex='man',dt)        //把t1中的数据分一个sex='man'的区

                混合分区:静态和动态都有

   ③通用命令

            create table if not exists t2 like t1;                            //创建同一结构的表

            alter table t1 add columns(value1 string,value2 string);                    //增加列
            alter table t1 replace columns(value3 string,value4 string);                    //替换列

            alter table t1 add partition(day=');                            //增加分区
            alter table t1 drop partition(day=');

            desc t1;                                        //查看表结构

            show tables;
            show functions;                                    //显示hive函数

            drop table t1;
            truncate table t1;                                    //清空表的数据

            ————————————————————————————————————————————————————————
            ①数据导入导出:
            ————————————————————————————————————————————————————————

            语法:
                load data [local] inpath ')];
                insert [overwrite] [directory 'url' | local directory 'url' | table tablename]     HQL;

            insert into t1(id,name) value(,'asd');
            insert into t1 partition(month=,'bangzhang');

            load data  inpath '/home/bigdata' into table hive.dep;                //把hdfs上的文件剪切到dep的表中,如果文件存在,则追加内容
            load data  inpath '/home/bigdata' overwrite into table hive.dep;            //把hdfs上的文件剪切到dep的表中,如果文件存在,则覆盖

            load data local inpath '/home/bigdata' overwrite into table hive.dep;            //把本地文件传到hive数据库的dep的表中

            load data local inpath ');        //加载数据到分区表
            load data local inpath ');    //加载数据到二级分区

            insert overwrite table t2 select * from t1;                        //从t1挑选数据重写到t2

            insert overwrite directory '/aaa/bbb/' select *from t_p;                //将挑选出来的数据写到hdfs目录下
            insert overwrite local directory '/home/hadoop/test' select *from t_name;        //将挑选出来的数据写到本地目录下

            hdfs dfs -get /user/hive/warehouse/student4/student.txt /data/student5.txt;        //通过hdfs进行导出

   ④HQL语句(排序)

          order by:全局排序            //全局排序,适用于一个reduce
          distribute by :指定字段分区        //必须设置reduce个数,不然只会有一个reduce,一个分区
          sort by:reduce排序
          partition by  :用于窗口函数中

        cluster by:当分区字段和排序字段相同时,可以用cluster by
        clustered by:分桶关键字

        select * from t1 order by sal DESC,deptno ASC;            //全局排序,适用于一个reduce

        ;                //设置reduce个数,默认为1,系统默认reduce个数为分区个数
        select * from t1 distribute by deptno sort by id DESC;        //distribute by    分区
                                    //sort by        排序

        select classID,SUM(grade) over(partition by classID) fromtb_Student     //窗口函数:over关键字

        select mid, money, name from store cluster by mid  ==>    select mid, money, name from store distribute by mid sort by mid
    

   ⑤窗口函数 

          窗口函数:row_number、rank、dense_rank
             a
             a
             b
             b
             a
             a
             a
             b
             a
            b
            a
          select id,name,sal,
          rank()over(partition by name order by sal desc ) rp,
          dense_rank() over(partition by name order by sal desc ) drp,
          row_number()over(partition by name order by sal desc) rmp
          from f_test

            b
             b
             b
             b
             a
             a
             a
             a
             a
             a
             a
          可以发现,窗口函数就是多了一列的排名列
          partition by name order by sal desc  <==> distribute by name sort by id desc
          也就是说:partition by  和 distribute by等价,但是后面是order by 而不是sort by

          窗口函数:LAG和LEAD:增加一列,列的值就是前(后)一行的某一列的值

          窗口函数:first_value和last_value

              first_value取分组内排序后,截止到当前行,第一个值
              last_value取分组内排序后,截止到当前行,最后一个值

   ⑥自定义正则输入

      注:有时候数据并非是用  '\t' 进行分割,就需要自己清洗。

          CREATE TABLE apachelog (
               host string,
                identity string,
                agent string
          )row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe'    //以正则的方式建表
          WITH SERDEPROPERTIES (
            "input.regex" = "([^]*) ([^]*) ([^]*)"                //用正则提取,每个()表示一个字段,里面是正则表达式
          )
          stored as textfile ;

   ⑦自定义HIVE函数

        注:经过上一步之后,字段中保存的是“/Aug/::: +”,我们希望在select能将它变为2015-- ::37这种标准格式。

        public class TestUDF extends UDF{

            public String evaluate(String time) throws ParseException {

                String output=null;
                SimpleDateFormat  inputDate=new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss",Locale.ENGLISH);
                SimpleDateFormat outputDate=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                if(time==null) {
                    return null;
                }
                if(StringUtils.isBlank(time)) {
                    return null;
                }
                String pares=time.replaceAll("\"", "");
                Date parseDate=inputDate.parse(pares);
                output=outputDate.format(parseDate);
                return output;
            }
        }

        注:UDF是HIVE自定义函数的类。

            、重写evaluate方法。
            、打包成jar,上传到linux
            、add jar /data/myudf.jar;                    //hive命令行
            、create temporary function my_upper as "org.udf.test.Lower";    //给函数取名,并且关联包中的类
            、select my_upper(name) as name from student7;        //调用

        自定义函数是非常重要的,对于很多数据,用函数对其清洗,使其规律化。   

   ⑧压缩

      和MR一样,都有压缩,分为map端压缩和reduce端压缩,可以在配置文件中设置。

      注:为了减少网络传输,将map产生的分区文件进行压缩,然后传给reduce,执行解压。(map压缩)
        为了减少输出,将reduce的结果进行压缩。(reduce压缩)

   ⑨文件存储格式

        orc: 列式存储
        testfile : 文本格式
        Parquet:列式存储 二进制存储方式

        注:数据压缩比例上ORC最优,相比textfile节省了50倍磁盘空间,parquet压缩性能也较好。
        注:SQL查询速度而言,ORC与parquet性能较好    

        create table t1(
            id int ,name string
        ) partitioned by (day string)
        row format delimited fields terminated by '\t'
        stored as orc ;

  3、分区与目录关联

    案例:把数据直接先上传到HDFS的分区目录中,在让分区表与数据产生关联

      1 上传数据后修复

        1) 首先在HDFS上先创建一个目录,为如下结构

          /user/hive/warehouse/dept_partition2/month=2/day=16

        2) 上传数据到该目录中

          dfs -put /data/dept.txt /user/hive/warehouse/dept_partition2/month=2/day=16;

        3) 查询分区表,查看上传之后的数据

          select * from dept_partition2 where month='2' and day='16'; //但是发现结果没有查询到任何数据

        4) 执行修复命令,使HDFS上的数据与表结构相关联

          msck repair table dept_partition2;

        5) 再次查询分区表结果就可以查询到数据内容

          select * from dept_partition2 where month='2' and day='16';

   4、HIVE优化

      三个方面:
          、不执行mapreduce        :Fetch抓取
          、不用分布式去执行mapreduce    :本地模式
          、进行过滤            :严格模式
          、多线程                :并行执行

          、内存方面            :jvm重用
          、开启推测执行
          、合理设置map和reduce个数

      ①Fetch抓取

          一个简单的查询语句,是指一个没有函数、排序等功能的语句,当开启一个Fetch Task功能,
          就执行一个简单的查询语句不会生成MapRreduce作业,而是直接使用FetchTask,从hdfs文件系统中进行查询输出数据,从而提高效率。

          hive.fetch.task.conversion=more            //hive-site中设置

      ②本地模式

          在hive的运行过程中,如果输入的数据量非常小,可以启动本地模式,在hive的本地节点上进行运行,不会提交任务到集群中。这样可以避免集群分配资源造成的时间浪费

          等于说以前用集群,但是如果输入数据量小的话,可以只用一台机器去运行。

          hive.exec.mode.local.auto=true;            //hive-site中设置
          hive.exec.mode.local.auto.inputbytes.max=    //输入超过128M不能本地模式
          ;        //输入超过四个文件不能本地模式

      ③表的优化

          ) 大表在join小表的情况下,把小表放到join的左边,这样小表就会被缓存到map中,之后在进行大表的读取:
          ) 在进行join的时候尽量检查HQL语句,不要产生笛卡儿积
          )数据的倾斜

               合理设置map端的数量,通过切片的数量, 以及input目录中文件的数量来决定

               当输入的数据存下很多小文件,小文件的数据远远小于128MB ,每个小文件都会被当成一个map任务输出,
              而每个map的启动和初始化都是需要资源的,启动的任务,与要分析的任务的时长不成正比,会造成很大的资源浪费

               合理的设置reduce数量

              hive.exec.reducers.max=            //reduce的个数

      ④并行执行

          Hive语句会把查询转换为多个阶段,抽样阶段,合并阶段,limit阶段。默认情况下hive只会一个任务执行一个阶段。
          当某些任务不存在依赖关系时,可以开启并发执行

              hive.exec.parallel=true;
              hive.exec.parallel.thread.number=;

      ⑤严格模式

              hive.mapred.mode=strict;            //nonstrict: 非严格模式

          ) 对于分区表: 除非where语句中,包含分区字段的过滤条件,否则不让执行
          ) 使用order by语句进行查询,必须包含limit 子句进行限制否则不让执行
          ) 限制笛卡儿积的查询

      ⑥JVM重用
  
           对于MR任务执行的时候存在很多小文件,task任务启动就会很多,每个任务执行的时间都很短,
            JVM重用可以使得JVM实例在同一个JOB中重新使用N次。可以减少JVM的启动次数以及资源的消耗,

          <property>
                <name>mapreduce.job.jvm.numtasks</name>
                <value></value>                //mapred-site文件中修改
          </property>

      ⑦推测执行

          原因:因为程序bug,负载不均衡或者资源分布不均,造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task。

          解决:为该任务重新启动的一个,备份任务,与原来的任务一致,这两个任务同时运行,哪个任务先执行完毕,则以哪个任务为准。

          前提:
              ) 每个task只能存在一个备份任务
              ) 当前的job 必须完成task任务的5%

          不适用:
              )任务出现了严重数据倾斜
              )特殊的任务,reduce 把结果输出到数据库中

              缺点:以空间换时间,当需要备份任务过多,只会更加拖累进程。当发生严重数据倾斜时,备份只会是火上浇油。

  5、其他  

      ①虚拟列

           INPUT__FILE__NAME:                     //可以查询表中文件存储的位置
           BLOCK__OFFSET__INSIDE__FILE:                 //表中文本中数据的偏移量

           select id ,INPUT__FILE__NAME ,BLOCK__OFFSET__INSIDE__FILE  from  t1

           注:开启之后能查出文件的存储位置和数据的偏移量信息

      ② Hive Server2

          注:将此HIVE开放,客户端能连

          ) 启动hiveserver2的命令

              bin/hiveserver2 

          ) 客户端连接命令

              bin/beeline                     //首先使用beeline 命令进入到命令行模式
              
              !connect jdbc:hive2://hadoop01:10000 hadoop sacsac    //之后进行连接

              链接之后,能执行HQL语句。

          )API

              通过写java代码,能执行HQL语句

2、sqoop

、Sqoop底层是使用mapreduce实现的,但是只是用到了map阶段,没有用到reduce阶段

    思考:为什么sqoop使用mapreduce底层来实现? 

        答:Mapreduce是一个分布式计算框架,传输海量数据的时候效率更高

    思考2:为什么sqoop只使用了map阶段没使用redue阶段? 

        答:Sqoop 仅仅是做数据传输,并涉及到计算,所以没有使用到redcue

    //导入import,导出export

、mysql---->hdfs

    create table to_hdfs(
        id int primary key not null,
        name varchar() not null
    );
    insert into to_hdfs(id,name)values(,'tiantian');        //现在mysql中创建表to_hdfs和数据
    insert into to_hdfs(id,name)values(,'xuewei');
    insert into to_hdfs(id,name)values(,'xiaopang');
    insert into to_hdfs(id,name)values(,'bangzhang');

    sqoop import
    --connect jdbc:mysql://192.168.40.10:3306/test
    --username root
    --password root
    --table to_hdfs                    //连接mysql的test中的to_hdfs的数据
    --target-dir /to_hdfs                 //导出到/to_hdfs文件夹下(自动创建)
    --fields-terminated-by '\t' \                //导出的文件分隔符为\t
    --num-mappers                     //map数量为1,导出成一个文件    

    //--delete-target-dir                //如果输出目录存在则删除

    sqoop import
    --connect jdbc:mysql://hadoop01:3306/test
    --username root
    --password root
    --table to_hdfs
    --fields-terminated-by '\t'
    --target-dir /to_hdfs2
    --num-mappers
    --check-column id                     //检查列
    --incremental append                 //是否是追加
    --last-value                     //检查列的值

    mysql中test数据库to_hdf表中id大于8的才会被追加到/to_hdfs2 下的文件中

、mysql----->hdfs

    sqoop import
    --connect jdbc:mysql://s10:3306/test
    --username root
    --password root
    --table to_hdfs
    --target-dir /to_hdfs3
    --hive-import
    --hive-database hive_test
    --hive-table stu_info
    --num-mappers
    --fields-terminated-by '\t'

、hdfs----->mysql

    create table to_mysql(
    id int primary key not null,
    name varchar() not null
    );

    sqoop export
    --connect jdbc:mysql://s10:3306/test
    --username root
    --password root
    --table to_mysql
    --export-dir /user/hive/warehouse/session_info/day=
    --num-mappers
    --input-fields-terminated-by '\t'

、job

    当某些导入导出命令,经常用时,可以作为一个job保存。

    sqoop job \
    --create stu_info \
    -- \
    import
    --connect jdbc:mysql://hadoop01:3306/test \
    --username root \
    --password root \
    --table to_hdfs \
    --target-dir /to_hdfs4 \
    --num-mappers  

    sqoop job --list        //查看已经创建的job列表
    sqoop job --exec stu_info    //执行job,需要输入mysql的密码
上一篇:常用Java API之Ramdom--用代码模拟猜数小游戏


下一篇:Linux 文件查找