码迷,mamicode.com
首页 > 其他好文 > 详细

Hive基础

时间:2018-10-29 18:28:23      阅读:173      评论:0      收藏:0      [点我收藏+]

标签:引号   def   _for   NPU   最大   商品   output   本地   shel   

目录

基础

建立在Hadoop上的数据仓库(Hive的表就是HDFS的目录、数据就是HDFS的文件),定义了类似SQL的查询语言,通过它来读写和管理分布式存储的数据。它的底层执行引擎可以是MapReduce、Spark等(将SQL语句转化成M/R或者Spark语言)。

优点:简单SQL,常用于数据分析,扩展性好(计算和存储),统一的元数据管理(它创建的表或其他组件,如Spark创建的表都是通用的)

缺点:无法进行迭代式计算、数据挖掘、延迟高(不用索引、且利用MR,处理小数据没优势)、调优困难(粒度粗)

概念

  • 数据仓库:一个面相主题的(用于推荐、可视化等)、(业务数据、文档资料等经过ETL后)集成的、不可更新的、随时间不变化的数据集合,用于支持企业或组织的决策分析处理。有不同的服务器,给前端提供不同的功能,如数据查询、报表、分析等。

  • 发展阶段:简单报表(日常工作中,汇总数据)、数据集市(根据某业务部门需求进行采集整理)、数据仓库(按照数据模型,对整个企业的数据进行采集和整理)

  • 星型模型和雪花模型

    在冗余可以接受的前提下,实际运用中星型模型使用更多

    前者面相主题,非正规化的结构,多维数据集的每一个维度都直接与事实表相连接,不存在渐变维度,所以数据有一定的冗余,但效率高。如以商品为主题,其他信息包括订单、客户、厂家、物流等。冗余:如客户地址信息中,相同省份会出现重复的省份名称。

    后者在前者继承再生成星型,变成有一个或多个维表没有直接连接到事实表上,而是通过其他维表连接到事实表上。如客户的其他信息有家庭信息、地址、教育等信息。通过最大限度地减少数据存储量以及联合较小的
    维表来改善查询性能,去除了数据冗余。

  • OLTP事务和OLAP分析:操作型处理,针对具体业务在数据库联机的日常操作,通常对少数记录进行查询、修改,用户关心的是它的正常运作;分析型处理:针对某些主题的历史数据进行分析,支持管理决策。

操作型处理 分析型处理
细节的 综合的或提炼的
实体——关系(E-R)模型 星型模型或雪花模型
存取瞬间数据 存储历史数据,不包含最近的数据
可更新的 只读、只追加
一次操作一个单元 一次操作一个集合
性能要求高,响应时间短 性能要求宽松
面向事务 面向分析
一次操作数据量小 一次操作数据量大
支持日常操作 支持决策需求
数据量小 数据量大
客户订单、库存水平和银行账户查询等 客户收益分析、市场细分等
  • 仓库架构

    四层:ODS(临时存储层)、PDW(数据仓库层,清洗过后的,一般遵循第三范式)、DM(数据集市层,面相主题,星形或雪花结构的数据,轻度汇总级、不存在明细数据、覆盖所有业务)、APP(应用层)

  • 元数据

    技术元数据和业务元数据。

    存储方式:每一个数据集有对应的元数据文件和元数据库。前者有较强独立性,但在大规模处理中有大量元数据文件。后者推荐。

架构
访问:命令行shell, jdbc, web gui
Driver:解释器(语法)、编译器(编译、执行计划)、优化器(优化计划)结合元数据,完成HQL查询语句计划的生成。生成的查询计划存储在HDFS中,并在随后由计算引擎来对HDFS进行查询。
Metastore:元数据(表信息、列信息、分区、数据所在目录等)存储在数据库(mysql、derby、oracle)中。

Hive在集群当中只充当client,在一台机器部署就可以了。

使用

-e:不进入 hive 的交互窗口执行 sql 语句,bin/hive -e "select id from student;"

-f:执行脚本中 sql 语句

查看hdfs:dfs -ls /;

查看历史:cat .hivehistory

属性配置

查看配置:set 配置属性名称

默认配置文件:hive-default.xml

用户自定义配置文件:hive-site.xml。用户自定义配置会覆盖默认配置

# default仓库原始位置。
<property>
    <name>hive.metastore.warehouse.dir</name>                   
    <value>/Users/flyang/Documents/self-teaching/Output/Hive</value>
    <description>location of default database for the warehouse</description>
</property>

# 查询显示列名
<property> 
    <name>hive.cli.print.header</name> 
    <value>true</value>
</property>

# 显示当前数据库
<property> 
    <name>hive.cli.print.current.db</name> 
    <value>true</value>
</property>

hive-log4j.properties:修改log的存放位置hive.log.dir=/opt/module/hive/logs

其他配置方式

启动配置hive -hiveconf mapred.reduce.tasks=10;

交互配置set mapred.reduce.tasks=100;(有些log4j配置不适用)

数据类型

Hive 数据类型 Java 数据类型 长度
TINYINT byte 1byte 有符号整数
SMALINT short 2byte 有符号整数
INT int 4byte 有符号整数
BIGINT long 8byte 有符号整数
BOOLEAN boolean 布尔类型,true 或者 false
FLOAT float 单精度浮点数
DOUBLE double 双精度浮点数
STRING string 字符系列。可以指定 字符集。可以使用单 引号或者双引号。
TIMESTAMP 时间类型
BINARY 字节数组

对于 Hive 的 String 类型相当于数据库的 varchar 类型,该类型是一个可变的字符串,不过不能声明大小上限。

Hive 有三种复杂数据类型 ARRAY、MAP 和 STRUCT。

{
    "name": "songsong", 
    "friends": ["bingbing" , "lili"] , //列表 Array
    "children": {  //键值 Map,
        "xiao song": 18 ,
        "xiaoxiao song": 19 
    }
    "address": {  //结构 Struct,
        "street": "hui long guan" , 
        "city": "beijing"
    }
}
# 例子
create table test(
name string,
friends array<string>,
children map<string, int>,
address struct<street:string, city:string>
)
row format delimited fields terminated by ‘,‘ 
collection items terminated by ‘_‘
map keys terminated by ‘:‘
lines terminated by ‘\n‘;

songsong,bingbing_lili,xiao song:18_xiaoxiao song:19,hui long guan_beijing yangyang,caicai_susu,xiao yang:18_xiaoxiao yang:19,chao yang_beijing

load data local inpath ‘/opt/module/datas/test.txt‘ into table test; # 不加local就从HDFS加载

# 访问三种集合列里的数据,以下分别是 ARRAY,MAP,STRUCT 的访问方式
select friends[1],children[‘xiao song‘],address.city from test where name="songsong";

数据转换:

  • 隐式向更大一级转换,向小的要 CAST(‘1‘ AS INT),失败得null
  • 所有整数类型、FLOAT 和 STRING 类型都可以隐式地转换成 DOUBLE
  • TINYINT、SMALLINT、INT 都可以转换为 FLOAT。
  • BOOLEAN 类型不可以转换为任何其它的类型。

DDL数据定义

数据库

默认路径在配置文件中设置

# 创建
create database if not exists db_hive;
# 指定在hdfs上的位置
create database db_hive2 location ‘/db_hive2.db‘;
# 修改数据库属性
alter database db_hive set dbproperties(‘createtime‘=‘20170830‘);
# 查看数据库,extended是显示详细
desc database extended db_hive;
# 显示数据库
show databases like ‘db_hive*‘;
# 删除,cascade是当数据库不为空时用
drop database if exists db_hive2 cascade;

创建相关

CREATE [EXTERNAL] TABLE IF NOT EXISTS db_name.table_name
    [(col_name data_type [COMMENT col_comment], ...)]
    [COMMENT table_comment]
    [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] # 一个分区表对应一个HDFS文件夹
    [CLUSTERED BY (col_name, col_name, ...) # 创建分桶表
    [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] # 不常用
    [ROW FORMAT row_format] 
    [STORED AS file_format]
    [LOCATION hdfs_path]

# row_format
DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]

# file_format
SEQUENCEFILE(压缩用,二进制序列文件)、TEXTFILE(文本)、 RCFILE(列式存储格式文件)

# 补充
# 根据查询结果建表
create table if not exists student3 as select id, name from student;
# 根据已存在的表结构创建表
create table if not exists student4 like student;
# 查看表结构,比库多个formatted
desc formatted student2;
# 删除表
drop table dept_partition;
# 删除数据
truncate table student;

外部表和内部表

在建表的同时指定一个指向实际数据的路径(LOCATION),Hive 创建内部表(MANAGED_TABLE)时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。

应用:每天将收集到的网站日志定期流入 HDFS 文本文件。在外部表(原始日志表)的基础 上做大量的统计分析,用到的中间表、结果表使用内部表存储,数据通过 SELECT+INSERT 进入内部表。

分区表

# 例子
create table dept_partition(
deptno int, dname string, loc string
)
partitioned by (month string) # 二级分区表(month string, day string)
row format delimited fields terminated by ‘\t‘;

10  ACCOUNTING  NEW YORK    201707
10  ACCOUNTING  NEW YORK    201708
20  RESEARCH    DALLAS  201707
20  RESEARCH    DALLAS  201708

# 创建表时,partitioned by (month string)
load data local inpath ‘/opt/module/datas/dept.txt‘ into table default.dept_partition partition(month=‘201709‘); # 给"month=201709"这个分区表导入。二级分区表(month=‘201709‘, day=‘13‘)

# 查询分区表
select * from dept_partition where month=‘201707‘ # where month=‘201709‘ and day=‘13‘二级分区表
union [ALL | DISTINCT] 
select * from dept_partition where month=‘201708‘;

# 创建分区,不加逗号
alter table dept_partition add partition(month=‘201706‘) partition(month=‘201705‘);

# 删除,加逗号
alter table dept_partition drop partition (month=‘201705‘), partition (month=‘201706‘);

# 查看有多少个分区
show partitions dept_partition;

# 直接上传到分区目录上后建立联系,三种方法
# 上传到相应的目录dept_partition2/month=201709/day=12;
# (1)修复
msck repair table dept_partition2;
# (2)添加分区
alter table dept_partition2 add partition(month=‘201709‘, day=‘11‘);
# (3)load数据
load data local inpath ‘/opt/module/datas/dept.txt‘ into table dept_partition2 partition(month=‘201709‘,day=‘10‘);

修改表

# 改表名
alter table dept_partition2 rename to dept_partition3;

# 改列名
ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]

# 增加和替换列,增加在所有列后面(partition 列前)
ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)

分桶

在一个文件夹下多个文件。分区是多个文件夹下各一个文件(没有设置bucket的话)

create table stu_buck(id int, name string) 
clustered by(id) into 4 buckets
row format delimited fields terminated by ‘\t‘;

set hive.enforce.bucketing=true;
set mapreduce.job.reduces=-1;

insert into table stu_buck
select id, name from stu cluster by(id);

# 抽样,4是table 总 bucket 数的倍数或者因子。
# table 总共分了 4 份,当 y=2 时,抽取(4/2=)2 个 bucket 的数据,当 y=8 时,抽取(4/8=)1/2 个 bucket 的数据。x 的值必须小于等于 y 的值。4 out of 4表示总共抽取(4/4=)1 个 bucket 的数据,抽取第 4 个 bucket 的数据
select * from stu_buck tablesample(bucket x out of y on id);

select * from stu tablesample(0.1 percent) ;不一定适用于所有的文件格式。另外,这种抽样的最小抽样单元是一个 HDFS 数据块,小于返回全部。

DML数据操作

# 加载数据
load data [local] inpath ‘/opt/module/datas/student.txt‘ [overwrite] into table student [partition (partcol1=val1,...)];

# 插入数据
insert into table student partition(month=‘201709‘) values(‘1004‘,‘wangwu‘);

insert overwrite table student partition(month=‘201708‘) select id, name from student where month=‘201709‘;

from student
insert overwrite table student partition(month=‘201707‘)
select id, name where month=‘201709‘
insert overwrite table student partition(month=‘201706‘) select id, name where month=‘201709‘;

# 查询语句中创建表并加载数据
create table if not exists student3 as select id, name from student;

# 创建表时通过 Location 指定加载数据路径

# Import 数据到指定 Hive 表中。先用 export 导出后,再将数据导入
import table student2 partition(month=‘201709‘) from ‘path‘

# 导出
# Insert导出,将查询的结果导出到本地,格式化部分可选,没有local就到hdfs
insert overwrite local directory ‘/opt/module/datas/export/student‘ 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘
COLLECTION ITEMS TERMINATED BY ‘\n‘ 
select * from student;
# Export 导出到 HDFS 上
export table default.student to ‘/user/hive/warehouse/export/student‘;

查询

比较运算符(有别于MySQL部分)

操作符 支持的数据类型 描述
A<=>B 基本数据类型 如果 A 和 B 都为 NULL,则返回 TRUE,其他 的和等号(=)操作符的结果一致,如果任一 为 NULL 则结果为 NULL
A RLIKE B, A REGEXP B STRING 类型 B 是一个正则表达式,如果 A 与其匹配,则返回 TRUE;反之返回 FALSE。匹配使用的是 JDK中的正则表达式接口实现的,因为正则也依据其中的规则。例如,正则表达式必须和整个字符串 A 相匹配,而不是只需与其字符串匹配

% 代表零个或多个字符(任意个字符)。

_ 代表一个字符。

Join

只支持等值连接,不支持非等值连接。

每对JOIN对象启动一个MR任务

join中不支持or

笛卡尔积:

(1)省略连接条件:select empno, deptno from emp, dept;
(2)连接条件无效
(3)所有表中的所有行互相连接

排序

Order By:全局排序,一个 MapReduce。可设置set mapreduce.job.reduces=3;,每个MR内部排序

Distribute By:类似 MR 中 partition,进行分区,结合 sort by 使用。DISTRIBUTE BY 语句要写在 SORT BY 语句之前。对于 distribute by 进行测试,一定要分配多 reduce 进行处理(set mapreduce.job.reduces=3;),否则无法看到 distribute by 的效果。

Cluster By:当 distribute by 和 sorts by 字段相同时,可以使用 cluster by 方式。只能是倒序排序,不能指定排序规则为 ASC 或者 DESC。

select * from emp order by sal desc;
# 下面两个等价
select * from emp cluster by deptno;
select * from emp distribute by deptno sort by deptno;

函数

# 查看系统自带的函数 
hive> show functions;
# 显示自带的函数的用法 
hive> desc function upper;
# 详细显示自带的函数的用法
hive> desc function extended upper;

自定义函数:(了解)

(1)继承 org.apache.hadoop.hive.ql.UDF

(2)需要实现 evaluate 函数;evaluate 函数支持重载;

UDF 必须要有返回类型,可以返回 null,但是返回类型不能为 void;

(3)模块打包,放到hive/lib。临时添加可用add jar path,永久添加要修改hive-site.sml。..为其他包,比如要处理json数据,就要另加一个jar包。

<property>
    <name>hive.aux.jars.path</name>
    <value>file:///opt/module/hive/lib/app_logs_hive.jar,...</value>
</property>

(3)在 hive 的命令行窗口注册函数

create function getdaybegin AS ‘com.my.hive.DayBeginUDF‘;

(4)验证函数

登录mysql -> metastore数据库 -> FUNCS表

(5)删函数时,要处在相应的数据库才删。drop function getdaybegin;

压缩和存储

一般选用:snappy压缩,orc 或 parquet存储

压缩

map端

1)开启 hive 中间传输数据压缩功能
set hive.exec.compress.intermediate=true;
2)开启 mapreduce 中 map 输出压缩功能
set mapreduce.map.output.compress=true;
3)设置 mapreduce 中 map 输出数据的压缩方式
set mapreduce.map.output.compress.codec= ...
reduce端

set hive.exec.compress.output=true;

set mapreduce.output.fileoutputformat.compress=true;

set mapreduce.output.fileoutputformat.compress.codec=...

终数据输出压缩为块压缩: set mapreduce.output.fileoutputformat.compress.type=BLOCK;

存储

Hive 支持的存储数的格式主要有:

行存储:TEXTFILE(默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用,但不能并行查询) 、SEQUENCEFILE

列存储:ORC、PARQUET(二进制方式存储,文件中包含元数据,故自解析。按block分,每一个行组由一个 Mapper 任务处理)

压缩比:ORC > Parquet > textFile

查询速度:ORC > TextFile > Parquet

ORC

由 1 个或多个 stripe 组成,每个 stripe250MB 大小,这个 Stripe相当于 RowGroup,由Index Data,Row Data 和 Stripe Footer组成。

Index Data: 默认是每隔 1W 行做一个索引。这里做的索引应该只是记录某行的各字段在 Row Data 中的 offset。

Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储。

Stripe Footer:存的是各个 Stream 的类型,长度等信息。

每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column 的数据类型

信息等;每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer 长度,再读 FileFooter,从里面解析到各个 Stripe 信息,再读各个 Stripe,即从后往前读。

PARQUET:(参考原资料)

# 存储要设置才会是非压缩,默认ZLIB压缩,也支持SNAPPY
stored as orc tblproperties ("orc.compress"="NONE");

调优

  • Fetch 抓取:对某些情况的查询可以不必使用 MapReduce 计算

    在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,在全局查找、字段查找、limit 查找等都不走mapreduce。

  • 本地模式:当输入文件个数较小时,在单台机器上处理所有的任务

    hive.exec.mode.local.auto 的值为 true

    hive.exec.mode.local.auto.inputbytes.max=128M默认

    hive.exec.mode.local.auto.input.files.max=4默认

  • 表优化

    新版的 hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放在左边和右边已经没有明显区别

    # 让小的维度表(1000 条以下的记录条数)先进内 存
    set hive.auto.convert.join = true; # 默认true
    hive.mapjoin.smalltable.filesize 默认值是25mb
    
    # 过滤null值再join
    insert overwrite table jointable
    select n.* from (
      select * from nullidtable where id is not null 
      ) n left join ori o on n.id = o.id;
    
    # 空key转换,注意case部分
    set mapreduce.job.reduces = 5;
    insert overwrite table jointable
    select n.* from nullidtable n full join ori o on
    case when n.id is null then concat(‘hive‘, rand()) else n.id end = o.id;
  • MapJoin

    不设置就是不同join,即在 Reduce 阶段完成 join,容易数据倾斜。查看“表优化”第一条。执行过程是Task A的一个MR Local Task(客户端本地)将小表转换成一个HashTable数据结构,写入本地文件中,然后把文件加载到Distribute Cache。Task B是没有reducer的任务,里面每个mapper都去找Distribute Cache join,然后输出。

  • Group By

    map端聚合

    hive.map.aggr = true
    #  Map 端进行聚合操作的条目数目
    hive.groupby.mapaggr.checkinterval = 100000
    # 有数据倾斜的时候进行负载均衡
    hive.groupby.skewindata = true # 当选项设定为 true,生成的查询计划会有两个 MR Job。第一个map的输出随机分布到reduce,然后聚合,输出结果再分到下一个reduce。
  • Count去重

    # 数据大时不要写下面的
    select count(distinct id) from bigtable;
    # 写这个
    select count(id) from (select id from bigtable group by id) a;
  • 笛卡尔积

    尽量避免笛卡尔积,join 的时候不加 on 条件,或者无效的 on 条件,Hive 只能使用1个 reducer 来完成笛卡尔积

  • 行列过滤

    列处理:在 SELECT 中,只拿需要的列,如果有,尽量使用分区过滤,少用 SELECT *。

    行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 Where 后面, 那么就会先全表关联,之后再过滤

    # 先子查询where去掉无谓的关联
    select b.id from bigtable b
    join (select id from ori where id <= 10 ) o on b.id = o.id;
  • 动态分区调整

    Insert 数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中

    hive.exec.dynamic.partition=true
    # 动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的动态分区。
    hive.exec.dynamic.partition.mode=nonstrict
    hive.exec.max.dynamic.partitions=1000
    # 一年数据的话, day 字段有 365,最小大于365
    hive.exec.max.dynamic.partitions.pernode=100
    hive.exec.max.created.files=100000
    hive.error.on.empty.partition=false
    
    # 创建一个分区表,一个目标分区表
    insert overwrite table ori_partitioned_target partition (p_time)
    select ...., p_time from ori_partitioned;
  • 数据倾斜

    map数量的决定因素:input 的文件总个数,input 的文件大小,集群设置的文件块大小。

    • 合理设置Map数:当小文件多时减少map数,在 map 执行前合并小文件set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;。当map逻辑复杂时,增加map数,根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M ,调低maxSize。set mapreduce.input.fileinputformat.split.maxsize=100;
    • 合理设置 Reduce 数:hive.exec.reducers.bytes.per.reducer=256000000, hive.exec.reducers.max=1009和N=min(参数 2,总输入数据量/参数 1)?。在mapred-default.xml 中修改默认值。
  • 并发执行

    set hive.exec.parallel=true;, set hive.exec.parallel.thread.number=16;默认8

  • 严格模式

    禁止 3 种类型的查询

    • 对于分区表查询,除非 where 语句中含有分区字段过滤条件来限制范围,否则不允许执行,即不允许扫描所有分区。

    • 使用了 order by 语句的查询,要求必须使用 limit 语句

    • 限制笛卡尔积的查询

      关系型数据库,JOIN 查询的时候可以不使用 ON 语句而是使用 where 语句,但Hive不行

  • JVM重用

    小文件的场景或 task 特别多的场景,执行时间都很短,每个task的启动都是要启动JVM进程,消耗大。在Hadoop的mapred-site.xml 设置,通常在10~20,根据业务来定。缺点是数据倾斜,或者某个task运行得慢时,task池不释放资源。

  • 推断执行

    推断出运行较慢的任务,为它启动一个备份任务,让两个任务同时处理同一份数据,并最终选用最先成功的。如果用户只是因为输入数据量很大而需要执行长时间,启动这个功能浪费巨大。

    在mapred-site.xml中设置mapreduce.map.speculative,也有reduce。

  • 执行计划:explain [extended]

  • 分桶、分区、压缩

参考:

尚硅谷大数据技术之 Hive

Hive基础

标签:引号   def   _for   NPU   最大   商品   output   本地   shel   

原文地址:https://www.cnblogs.com/code2one/p/9871979.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!