1、HIVE
1、交互命令
use db_name; create database db_name //创建数据库 create database if not exists db_name //创建不存在的数据库 create database db_name location '/database/' //指定位置上创建数据库 show databases; show databases like 'f.*'; //模糊查询 drop database db_name drop database db_name cascade; //当数据库中有表,必须使用cascade
2、Hive操作
①数据库
use db_name; create database db_name //创建数据库 create database if not exists db_name //创建不存在的数据库 create database db_name location '/database/' //指定位置上创建数据库 show databases; show databases like 'f.*'; //模糊查询 drop database db_name drop database db_name cascade; //当数据库中有表,必须使用cascade
②数据表
、管理表:在删除表的时候,会同时把表中对应的数据删除; 、外部表:在删除表的时候不会删除表中对应的数据,而只会删除表中的元数据信息; 、分区表:Hive的分区就是分目录,把一个大的数据集拆分成一个个小的数据集,独立保存在不同的我文件夹中。 在查询的时候可以通过where子句进行查询,提高查询效率。 、分桶:将一个文件分成几个小文件(基本没啥用)。 ———————————————————————————————————————————————————————————— 、管理表(内部表),外部表 ———————————————————————————————————————————————————————————— create table t1(id int,name string) row format delimited fields terminated by '\t'; //创建一个分隔符为\t的管理表 create external table t1(id int,name string) row format delimited fields terminated by '\t'; //创建一个分隔符为\t的外部表 ———————————————————————————————————————————————————————————— 、分区表 ———————————————————————————————————————————————————————————— create table t1(id int ,name string) partitioned by (day string) row format delimited fields terminated by '\t'; create table t4( id int , name string ) partitioned by (month string,day string) row format delimited fields terminated by '\t'; //创建二级分区 '; '; //查询二级分区内容 show partitions t1; //显示表t1的分区 ①分区类型: 静态分区:加载数据时指定分区的值 load data local inpath '/home/1.txt' into table t1 partition (sex='woman'); 动态分区:数据未知,需要根据分区的值来确定需要创建的分区 insert overwrite table t1 partition(sex='man',dt) //把t1中的数据分一个sex='man'的区 混合分区:静态和动态都有
③通用命令
create table if not exists t2 like t1; //创建同一结构的表 alter table t1 add columns(value1 string,value2 string); //增加列 alter table t1 replace columns(value3 string,value4 string); //替换列 alter table t1 add partition(day='); //增加分区 alter table t1 drop partition(day='); desc t1; //查看表结构 show tables; show functions; //显示hive函数 drop table t1; truncate table t1; //清空表的数据 ———————————————————————————————————————————————————————— ①数据导入导出: ———————————————————————————————————————————————————————— 语法: load data [local] inpath ')]; insert [overwrite] [directory 'url' | local directory 'url' | table tablename] HQL; insert into t1(id,name) value(,'asd'); insert into t1 partition(month=,'bangzhang'); load data inpath '/home/bigdata' into table hive.dep; //把hdfs上的文件剪切到dep的表中,如果文件存在,则追加内容 load data inpath '/home/bigdata' overwrite into table hive.dep; //把hdfs上的文件剪切到dep的表中,如果文件存在,则覆盖 load data local inpath '/home/bigdata' overwrite into table hive.dep; //把本地文件传到hive数据库的dep的表中 load data local inpath '); //加载数据到分区表 load data local inpath '); //加载数据到二级分区 insert overwrite table t2 select * from t1; //从t1挑选数据重写到t2 insert overwrite directory '/aaa/bbb/' select *from t_p; //将挑选出来的数据写到hdfs目录下 insert overwrite local directory '/home/hadoop/test' select *from t_name; //将挑选出来的数据写到本地目录下 hdfs dfs -get /user/hive/warehouse/student4/student.txt /data/student5.txt; //通过hdfs进行导出
④HQL语句(排序)
order by:全局排序 //全局排序,适用于一个reduce distribute by :指定字段分区 //必须设置reduce个数,不然只会有一个reduce,一个分区 sort by:reduce排序 partition by :用于窗口函数中 cluster by:当分区字段和排序字段相同时,可以用cluster by clustered by:分桶关键字 select * from t1 order by sal DESC,deptno ASC; //全局排序,适用于一个reduce ; //设置reduce个数,默认为1,系统默认reduce个数为分区个数 select * from t1 distribute by deptno sort by id DESC; //distribute by 分区 //sort by 排序 select classID,SUM(grade) over(partition by classID) fromtb_Student //窗口函数:over关键字 select mid, money, name from store cluster by mid ==> select mid, money, name from store distribute by mid sort by mid
⑤窗口函数
窗口函数:row_number、rank、dense_rank a a b b a a a b a b a select id,name,sal, rank()over(partition by name order by sal desc ) rp, dense_rank() over(partition by name order by sal desc ) drp, row_number()over(partition by name order by sal desc) rmp from f_test b b b b a a a a a a a 可以发现,窗口函数就是多了一列的排名列 partition by name order by sal desc <==> distribute by name sort by id desc 也就是说:partition by 和 distribute by等价,但是后面是order by 而不是sort by 窗口函数:LAG和LEAD:增加一列,列的值就是前(后)一行的某一列的值 窗口函数:first_value和last_value first_value取分组内排序后,截止到当前行,第一个值 last_value取分组内排序后,截止到当前行,最后一个值
⑥自定义正则输入
注:有时候数据并非是用 '\t' 进行分割,就需要自己清洗。 CREATE TABLE apachelog ( host string, identity string, agent string )row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' //以正则的方式建表 WITH SERDEPROPERTIES ( "input.regex" = "([^]*) ([^]*) ([^]*)" //用正则提取,每个()表示一个字段,里面是正则表达式 ) stored as textfile ;
⑦自定义HIVE函数
注:经过上一步之后,字段中保存的是“/Aug/::: +”,我们希望在select能将它变为2015-- ::37这种标准格式。 public class TestUDF extends UDF{ public String evaluate(String time) throws ParseException { String output=null; SimpleDateFormat inputDate=new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss",Locale.ENGLISH); SimpleDateFormat outputDate=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if(time==null) { return null; } if(StringUtils.isBlank(time)) { return null; } String pares=time.replaceAll("\"", ""); Date parseDate=inputDate.parse(pares); output=outputDate.format(parseDate); return output; } } 注:UDF是HIVE自定义函数的类。 、重写evaluate方法。 、打包成jar,上传到linux 、add jar /data/myudf.jar; //hive命令行 、create temporary function my_upper as "org.udf.test.Lower"; //给函数取名,并且关联包中的类 、select my_upper(name) as name from student7; //调用 自定义函数是非常重要的,对于很多数据,用函数对其清洗,使其规律化。
⑧压缩
和MR一样,都有压缩,分为map端压缩和reduce端压缩,可以在配置文件中设置。
注:为了减少网络传输,将map产生的分区文件进行压缩,然后传给reduce,执行解压。(map压缩)
为了减少输出,将reduce的结果进行压缩。(reduce压缩)
⑨文件存储格式
orc: 列式存储 testfile : 文本格式 Parquet:列式存储 二进制存储方式 注:数据压缩比例上ORC最优,相比textfile节省了50倍磁盘空间,parquet压缩性能也较好。 注:SQL查询速度而言,ORC与parquet性能较好 create table t1( id int ,name string ) partitioned by (day string) row format delimited fields terminated by '\t' stored as orc ;
3、分区与目录关联
案例:把数据直接先上传到HDFS的分区目录中,在让分区表与数据产生关联
1 上传数据后修复
1) 首先在HDFS上先创建一个目录,为如下结构
/user/hive/warehouse/dept_partition2/month=2/day=16
2) 上传数据到该目录中
dfs -put /data/dept.txt /user/hive/warehouse/dept_partition2/month=2/day=16;
3) 查询分区表,查看上传之后的数据
select * from dept_partition2 where month='2' and day='16'; //但是发现结果没有查询到任何数据
4) 执行修复命令,使HDFS上的数据与表结构相关联
msck repair table dept_partition2;
5) 再次查询分区表结果就可以查询到数据内容
select * from dept_partition2 where month='2' and day='16';
4、HIVE优化
三个方面: 、不执行mapreduce :Fetch抓取 、不用分布式去执行mapreduce :本地模式 、进行过滤 :严格模式 、多线程 :并行执行 、内存方面 :jvm重用 、开启推测执行 、合理设置map和reduce个数 ①Fetch抓取 一个简单的查询语句,是指一个没有函数、排序等功能的语句,当开启一个Fetch Task功能, 就执行一个简单的查询语句不会生成MapRreduce作业,而是直接使用FetchTask,从hdfs文件系统中进行查询输出数据,从而提高效率。 hive.fetch.task.conversion=more //hive-site中设置 ②本地模式 在hive的运行过程中,如果输入的数据量非常小,可以启动本地模式,在hive的本地节点上进行运行,不会提交任务到集群中。这样可以避免集群分配资源造成的时间浪费 等于说以前用集群,但是如果输入数据量小的话,可以只用一台机器去运行。 hive.exec.mode.local.auto=true; //hive-site中设置 hive.exec.mode.local.auto.inputbytes.max= //输入超过128M不能本地模式 ; //输入超过四个文件不能本地模式 ③表的优化 ) 大表在join小表的情况下,把小表放到join的左边,这样小表就会被缓存到map中,之后在进行大表的读取: ) 在进行join的时候尽量检查HQL语句,不要产生笛卡儿积 )数据的倾斜 合理设置map端的数量,通过切片的数量, 以及input目录中文件的数量来决定 当输入的数据存下很多小文件,小文件的数据远远小于128MB ,每个小文件都会被当成一个map任务输出, 而每个map的启动和初始化都是需要资源的,启动的任务,与要分析的任务的时长不成正比,会造成很大的资源浪费 合理的设置reduce数量 hive.exec.reducers.max= //reduce的个数 ④并行执行 Hive语句会把查询转换为多个阶段,抽样阶段,合并阶段,limit阶段。默认情况下hive只会一个任务执行一个阶段。 当某些任务不存在依赖关系时,可以开启并发执行 hive.exec.parallel=true; hive.exec.parallel.thread.number=; ⑤严格模式 hive.mapred.mode=strict; //nonstrict: 非严格模式 ) 对于分区表: 除非where语句中,包含分区字段的过滤条件,否则不让执行 ) 使用order by语句进行查询,必须包含limit 子句进行限制否则不让执行 ) 限制笛卡儿积的查询 ⑥JVM重用 对于MR任务执行的时候存在很多小文件,task任务启动就会很多,每个任务执行的时间都很短, JVM重用可以使得JVM实例在同一个JOB中重新使用N次。可以减少JVM的启动次数以及资源的消耗, <property> <name>mapreduce.job.jvm.numtasks</name> <value></value> //mapred-site文件中修改 </property> ⑦推测执行 原因:因为程序bug,负载不均衡或者资源分布不均,造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task。 解决:为该任务重新启动的一个,备份任务,与原来的任务一致,这两个任务同时运行,哪个任务先执行完毕,则以哪个任务为准。 前提: ) 每个task只能存在一个备份任务 ) 当前的job 必须完成task任务的5% 不适用: )任务出现了严重数据倾斜 )特殊的任务,reduce 把结果输出到数据库中 缺点:以空间换时间,当需要备份任务过多,只会更加拖累进程。当发生严重数据倾斜时,备份只会是火上浇油。
5、其他
①虚拟列 INPUT__FILE__NAME: //可以查询表中文件存储的位置 BLOCK__OFFSET__INSIDE__FILE: //表中文本中数据的偏移量 select id ,INPUT__FILE__NAME ,BLOCK__OFFSET__INSIDE__FILE from t1 注:开启之后能查出文件的存储位置和数据的偏移量信息 ② Hive Server2 注:将此HIVE开放,客户端能连 ) 启动hiveserver2的命令 bin/hiveserver2 ) 客户端连接命令 bin/beeline //首先使用beeline 命令进入到命令行模式 !connect jdbc:hive2://hadoop01:10000 hadoop sacsac //之后进行连接 链接之后,能执行HQL语句。 )API 通过写java代码,能执行HQL语句
2、sqoop
、Sqoop底层是使用mapreduce实现的,但是只是用到了map阶段,没有用到reduce阶段 思考:为什么sqoop使用mapreduce底层来实现? 答:Mapreduce是一个分布式计算框架,传输海量数据的时候效率更高 思考2:为什么sqoop只使用了map阶段没使用redue阶段? 答:Sqoop 仅仅是做数据传输,并涉及到计算,所以没有使用到redcue //导入import,导出export 、mysql---->hdfs create table to_hdfs( id int primary key not null, name varchar() not null ); insert into to_hdfs(id,name)values(,'tiantian'); //现在mysql中创建表to_hdfs和数据 insert into to_hdfs(id,name)values(,'xuewei'); insert into to_hdfs(id,name)values(,'xiaopang'); insert into to_hdfs(id,name)values(,'bangzhang'); sqoop import --connect jdbc:mysql://192.168.40.10:3306/test --username root --password root --table to_hdfs //连接mysql的test中的to_hdfs的数据 --target-dir /to_hdfs //导出到/to_hdfs文件夹下(自动创建) --fields-terminated-by '\t' \ //导出的文件分隔符为\t --num-mappers //map数量为1,导出成一个文件 //--delete-target-dir //如果输出目录存在则删除 sqoop import --connect jdbc:mysql://hadoop01:3306/test --username root --password root --table to_hdfs --fields-terminated-by '\t' --target-dir /to_hdfs2 --num-mappers --check-column id //检查列 --incremental append //是否是追加 --last-value //检查列的值 mysql中test数据库to_hdf表中id大于8的才会被追加到/to_hdfs2 下的文件中 、mysql----->hdfs sqoop import --connect jdbc:mysql://s10:3306/test --username root --password root --table to_hdfs --target-dir /to_hdfs3 --hive-import --hive-database hive_test --hive-table stu_info --num-mappers --fields-terminated-by '\t' 、hdfs----->mysql create table to_mysql( id int primary key not null, name varchar() not null ); sqoop export --connect jdbc:mysql://s10:3306/test --username root --password root --table to_mysql --export-dir /user/hive/warehouse/session_info/day= --num-mappers --input-fields-terminated-by '\t' 、job 当某些导入导出命令,经常用时,可以作为一个job保存。 sqoop job \ --create stu_info \ -- \ import --connect jdbc:mysql://hadoop01:3306/test \ --username root \ --password root \ --table to_hdfs \ --target-dir /to_hdfs4 \ --num-mappers sqoop job --list //查看已经创建的job列表 sqoop job --exec stu_info //执行job,需要输入mysql的密码