Hive 是基于hadoop的数据仓库,使用该应用程序进行相关的静态数据分析,不需要快速响应出结果,而且数据本身不会频繁变化。
如需实现oltp相关的记录级别的更新、插入、删除可以使用Hbase。
hive> CREATE DATABASE IF NOT EXISTS test;
可以通过 键 - 值
对设置数据库,但是某些元数据信息不可更改,如:数据库名和数据库所在的目录位置。
hive> ALTER DATABASE test SET DBPROPERTIES('edited-by'='Cleland');
hive> DROP DATABASE IF EXISTS test; -- 删除数据库(数据库下无数据表,如有会执行失败)hive> DROP DATABASE IF EXISTS test CASCADE; -- 强制删除数据库(先清表再删除数据库)
内部表vs外部表
目前使用阿里的 EMR+OSS 计算与存储分离,所以一些关键的表都使用的是外部表。
内部表
-- 建表DROP TABLE IF EXISTS test.employees;CREATE TABLE IF NOT EXISTS test.employees( emp_no INT COMMENT '员工ID', birth_date STRING COMMENT '出生日期', first_name STRING COMMENT '名', last_name STRING COMMENT '姓', gender STRING COMMENT '性别', hire_date STRING COMMENT '入职时间', title STRING COMMENT '职称')ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'TBLPROPERTIES ("skip.header.line.count"="1");-- 加载数据LOAD DATA INPATH '/data/employees.csv' OVERWRITE INTO TABLE test.employees;-- 查看数据SELECT * FROM test.employees
PS:
INPATH
前加上 LOCAL
外部表
-- 建表DROP TABLE IF EXISTS test.employees_ext;CREATE EXTERNAL TABLE IF NOT EXISTS test.employees_ext( emp_no INT COMMENT '员工ID', birth_date STRING COMMENT '出生日期', first_name STRING COMMENT '名', last_name STRING COMMENT '姓', gender STRING COMMENT '性别', hire_date STRING COMMENT '入职时间')ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'LOCATION '/data/employees_dir'TBLPROPERTIES ("skip.header.line.count"="1");-- 查看数据SELECT * FROM test.employees_ext
-- 查看表hive> USE test;hive> SHOW TABLES; hive> SHOW TABLES IN test;hive> SHOW TABLES IN test LIKE 'employees'-- 查看表的属性hive> SHOW CREATE TABLE employees; -- 查看建表语句hive> DESCRIBE EXTENDED test.employees; -- 查看详细表结果hive> DESCRIBE FORMATTED test.employees; -- 查看详细表结果,比EXTENDED内容更多。
hive> TRUNCATE test.employees; -- 清空表数据hive> DROP TABLE IF EXISTS test.employees; -- 删除表
由于 hdfs 设置了回收机制,数据会移动到 /user/$USER/.Trash
,所以 drop 表了不用怕,可从回收站中恢复(只增对表数据放在 hdfs 的情况)。
表属性
hive> ALTER TABLE test.employees RENAME TO test.employees_2; -- 表table1重名为table2hive> ALTER TABLE test.employees SET TBLPROPERTIES ('comment'='employees info') -- 修改表属性hive> ALTER TABLE test.employees SET FILEFORMAT SEQUENCFILE; -- 修改存储属性
分区(重要)
hive> SHOW PARTITIONS test.employees_part; -- 查看分区hive> ALTER TABLE test.employees_part DROP PARTITION(gender='F');hive> ALTER TABLE test.employees_part ADD PARTITION(gender='F');
1、直接将数据放入表指定或默认的位置(内部表:/user/hive/warehouse)
2、使用 load
hive> LOAD DATA INPATH '/data/employees.csv' OVERWRITE INTO TABLE test.employees; -- hdfs hive> LOAD DATA LOCAL INPATH '/data/employees.csv' OVERWRITE INTO TABLE test.employees; -- 本地
3、分区表,指定分区路径
见上文【表操作】->【修改表】-> 【分区】
4、将查询数据插入表
CREATE TABLE test.employees_10000 LIKE test.employees; INSERT OVERWRITE TABLE test.employees_10000 SELECT * FROM test.employees LIMIT 10000
5、快速复制表
非分区复制
create table t_copy as select * from t_temp
分区复制
create table t_copy like t_part; -- 复制表结果
dfs -cp /user/hive/warehouse/test.db/t_part/* /user/hive/warehouse/test.db/t_copy/; -- 复制表数据
msck repair table t_copy; -- 修复分区
-- 聚合函数count(expr)sum(col)avg(col)min(col)max(col)collect_set(col) -- 返回集合col元素排重后的数组(select非groupby的字段时可使用)-- 日期函数to_date(STRING timestamp) -- 返回时间字符串的日期部分,例如:to_date('1970-01-01 00:00:00')='1970-01-01'datediff(STRING start_date, STRING end_date) -- 计算两个日期之间的天数,例如:datediff('2018-04-04', '2018-04-01')=3date_add(STRING start_date, INT day) -- 从日期减去day天数,例如:date_add('2018-04-01', 4)='2018-04-05'date_sub(STRING start_date, INT day) -- 从日期减去day天数,例如:date_sub('2018-04-20', 4)='2018-04-16'year(STRING date) -- 返回时间字符串中的年month(STRING date) -- 返回时间字符串中的月day(STRING date) -- 返回时间字符串中的日期hour(STRING date) -- 返回时间字符串中的时minute(STRING date) -- 返回时间字符串中的分second(STRING date) -- 返回时间字符串中的秒weekofyear(STRING date) -- 一年的第几周-- 其他内置concat(STRING s1, STRING s2)concat_ws(STRING separator, STRING s1, STRING s2)format_number(NUMBER x, INT d)get_json_object(STRING json_string, STRING PATH)length(STRING s)ltrim(STRING s) -- 将字符串s前面出现的空格全部去除掉rtrim(STRING s) -- 将字符串s后面出现的空格全部去除掉trim(STRING s) -- 将字符串s前后面出现的空格全部去除掉
hive 不支持子查询,需要将子查询转化为外层 join 的形式。
SELECT ti.emp_no ,count(1) as numFROM test.employees emINNER JOIN test.title ti on ti.emp_no=em.emp_noGROUP BY ti.emp_noHAVING num>1ORDER BY num DESC
select 选取的非数值字段维度只能是 group 的维度,不能写入其他维度。
-- 建立表DROP TABLE IF EXISTS test.employees_part;CREATE TABLE IF NOT EXISTS test.employees_part( emp_no INT COMMENT '员工ID', birth_date STRING COMMENT '出生日期', first_name STRING COMMENT '名', last_name STRING COMMENT '姓', hire_date STRING COMMENT '入职时间')PARTITIONED BY(gender STRING)CLUSTERED BY (emp_no) SORTED BY(emp_no) INTO 10 BUCKETS-- 插入数据SET hive.exec.dynamic.partition=true;-- 严格型必需包括至少一个静态分区 SET hive.exec.dynamic.partition.mode=nostrict;SET hive.exec.max.dynamic.partitions=100000;SET hive.exec.max.dynamic.partitions.pernode=100000;INSERT OVERWRITE TABLE test.employees_part PARTITION (gender)SELECT emp_no ,birth_date ,first_name ,last_name ,hire_date ,genderFROM test.employees
hive 中的数据使用压缩的好处(执行查询时会自动解压):
查看集群的支持的压缩算法.
$ hive -e "set io.compression.codecs"io.compression.codecs=org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec
常用的压缩算法:
建表时申明文件的存储格式,默认为 TEXTFILE。如使用压缩常使用分块压缩 SEQUENCEFILE。
CREATE TABLE A( ...)STORED AS SEQUENCEFILE
数据处理的中间过程和结果使用Snappy算法进行压缩。
-- 任务中间压缩set hive.exec.compress.intermediate=true;set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;set hive.intermediate.compression.type=BLOCK;-- map/reduce 输出压缩set hive.exec.compress.output=true;set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;set mapred.output.compression.type=BLOCK;
Hive 原始数据为 119.2G。
DROP TABLE OLTP.house_mogu;CREATE EXTERNAL TABLE OLTP.house_mogu( id STRING, platform_config ARRAY<MAP<STRING, STRING>>, publish_time STRING, house_id STRING, room_info STRING, area_name STRING, broker_info MAP<STRING, STRING>)PARTITIONED BY(batch_num STRING)ROW FORMAT SERDE 'com.mongodb.hadoop.hive.BSONSerDe'WITH SERDEPROPERTIES ('mongo.columns.mapping'='{ "id":"_id", "platform_config":"platform_config", "publish_time":"publish_time", "house_id":"house_id", "room_info":"room_info", "area_name":"area_name", "broker_info":"broker_info" }')STORED AS INPUTFORMAT 'com.mongodb.hadoop.mapred.BSONFileInputFormat'OUTPUTFORMAT 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'LOCATION 'oss://${BIGDATA_HOME}/hive/warehouse/oltp.db/house_mogu';
需要在 hbase 建立 house_width 表
CREATE EXTERNAL TABLE OLAP.house_width( house_id STRING, src_type INT, crawl_time STRING,)STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key, HSInfo:house_id, HSInfo:src_type, HSInfo:crawl_time")TBLPROPERTIES ("hbase.table.name" = "house_width");
修改 ${HIVE_HOME}/conf/hive-site.xml
配置文件(永久有效)
<property> <name>mapreduce.job.queuename</name> <value>batch</value> </property>
命令行参数(当次有效)
$ hive --hiveconf mapreduce.job.queuename=batch
在已经进入 cli 时进行参数声明(当次有效)。
hive> SET mapreduce.job.queuename=batch
首先安装第三方库
$ pip install pyhs2
测试代码
import pyhs2with pyhs2.connect(host='47.94.36.22', port=10000, authMechanism="PLAIN", user='root', password='xxxx', database='test') as conn: with conn.cursor() as cur: #Show databases print cur.getDatabases() #Execute query cur.execute("select * from title limit 10") #Return column info from query print cur.getSchema() #Fetch table results for i in cur.fetch(): print i