The Hive query operations are documented in Select, and the insert operations are documented in Inserting data into Hive Tables from queries and Writing data into the filesystem from queries.
For all the active users, one can use the query of the following form:
INSERT OVERWRITE TABLE user_active SELECT user.* FROM user WHERE user.active = 1 ; |
Note that unlike SQL, we always insert the results into a table. We will illustrate later how the user can inspect these results and even dump them to a local file. You can also run the following query on Hive CLI:
SELECT user.* FROM user WHERE user.active = 1 ; |
This will be internally rewritten to some temporary file and displayed to the Hive client side.
What partitions to use in a query is determined automatically by the system on the basis of where clause conditions on partition columns. For example, in order to get all the page_views in the month of 03/2008 referred from domain xyz.com, one could write the following query:
INSERT OVERWRITE TABLE xyz_com_page_views SELECT page_views.* FROM page_views WHERE page_views.date >= ‘2008-03-01‘ AND page_views.date <= ‘2008-03-31‘ AND page_views.referrer_url like ‘%xyz.com‘ ; |
Note that page_views.date is used here because the table (above) was defined with PARTITIONED BY(date DATETIME, country STRING) ; if you name your partition something different, don‘t expect .date to do what you think!
In order to get a demographic breakdown (by gender) of page_view of 2008-03-03 one would need to join the page_view table and the user table on the userid column. This can be accomplished with a join as shown in the following query:
INSERT OVERWRITE TABLE pv_users SELECT pv.*, u.gender, u.age FROM user u JOIN page_view pv ON (pv.userid = u.id) WHERE pv.date = ‘2008-03-03‘ ; |
In order to do outer joins the user can qualify the join with LEFT OUTER, RIGHT OUTER or FULL OUTER keywords in order to indicate the kind of outer join (left preserved, right preserved or both sides preserved). For example, in order to do a full outer join in the query above, the corresponding syntax would look like the following query:
INSERT OVERWRITE TABLE pv_users SELECT pv.*, u.gender, u.age FROM user u FULL OUTER JOIN page_view pv ON (pv.userid = u.id) WHERE pv.date = ‘2008-03-03‘ ; |
In order check the existence of a key in another table, the user can use LEFT SEMI JOIN as illustrated by the following example.
INSERT OVERWRITE TABLE pv_users SELECT u.* FROM user u LEFT SEMI JOIN page_view pv ON (pv.userid = u.id) WHERE pv.date = ‘2008-03-03‘ ; |
In order to join more than one tables, the user can use the following syntax:
INSERT OVERWRITE TABLE pv_friends SELECT pv.*, u.gender, u.age, f.friends FROM page_view pv JOIN user u ON (pv.userid = u.id) JOIN friend_list f ON (u.id = f.uid) WHERE pv.date = ‘2008-03-03‘ ; |
Note that Hive only supports equi-joins. Also it is best to put the largest table on the rightmost side of the join to get the best performance.
In order to count the number of distinct users by gender one could write the following query:
INSERT OVERWRITE TABLE pv_gender_sum SELECT pv_users.gender, count (DISTINCT pv_users.userid) FROM pv_users GROUP BY pv_users.gender; |
Multiple aggregations can be done at the same time, however, no two aggregations can have different DISTINCT columns .e.g while the following is possible
INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(*), sum(DISTINCT pv_users.userid) FROM pv_users GROUP BY pv_users.gender; |
however, the following query is not allowed
INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender; |
The output of the aggregations or simple selects can be further sent into multiple tables or even to hadoop dfs files (which can then be manipulated using hdfs utilities). e.g. if along with the gender breakdown, one needed to find the breakdown of unique page views by age, one could accomplish that with the following query:
FROM pv_users INSERT OVERWRITE TABLE pv_gender_sum SELECT pv_users.gender, count_distinct(pv_users.userid) GROUP BY pv_users.gender INSERT OVERWRITE DIRECTORY ‘/user/data/tmp/pv_age_sum‘ SELECT pv_users.age, count_distinct(pv_users.userid) GROUP BY pv_users.age; |
The first insert clause sends the results of the first group by to a Hive table while the second one sends the results to a hadoop dfs files.
In the previous examples, the user has to know which partition to insert into and only one partition can be inserted in one insert statement. If you want to load into multiple partitions, you have to use multi-insert statement as illustrated below.
FROM page_view_stg pvs INSERT OVERWRITE TABLE page_view PARTITION(dt= ‘2008-06-08‘ , country= ‘US‘ ) SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null , null , pvs.ip WHERE pvs.country = ‘US‘ INSERT OVERWRITE TABLE page_view PARTITION(dt= ‘2008-06-08‘ , country= ‘CA‘ ) SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null , null , pvs.ip WHERE pvs.country = ‘CA‘ INSERT OVERWRITE TABLE page_view PARTITION(dt= ‘2008-06-08‘ , country= ‘UK‘ ) SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null , null , pvs.ip WHERE pvs.country = ‘UK‘ ; |
In order to load data into all country partitions in a particular day, you have to add an insert statement for each country in the input data. This is very inconvenient since you have to have the priori knowledge of the list of countries exist in the input data and create the partitions beforehand. If the list changed for another day, you have to modify your insert DML as well as the partition creation DDLs. It is also inefficient since each insert statement may be turned into a MapReduce Job.
Dynamic-partition insert (or multi-partition insert) is designed to solve this problem by dynamically determining which partitions should be created and populated while scanning the input table. This is a newly added feature that is only available from version 0.6.0. In the dynamic partition insert, the input column values are evaluated to determine which partition this row should be inserted into. If that partition has not been created, it will create that partition automatically. Using this feature you need only one insert statement to create and populate all necessary partitions. In addition, since there is only one insert statement, there is only one corresponding MapReduce job. This significantly improves performance and reduce the Hadoop cluster workload comparing to the multiple insert case.
Below is an example of loading data to all country partitions using one insert statement:
FROM page_view_stg pvs INSERT OVERWRITE TABLE page_view PARTITION(dt= ‘2008-06-08‘ , country) SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null , null , pvs.ip, pvs.country |
There are several syntactic differences from the multi-insert statement:
Semantics of the dynamic partition insert statement:
{}. Basically this partition will contain all "bad" rows whose value are not valid partition names. The caveat of this approach is that the bad value will be lost and is replaced by HIVE_DEFAULT_PARTITION
{} if you select them Hive. JIRA HIVE-1309 is a solution to let user specify "bad file" to retain the input partition column values as well. (空值使用默认的partition名字)Troubleshooting and best practices: 问题解决及最佳实践
As stated above, there are too many dynamic partitions created by a particular mapper/reducer, a fatal error could be raised and the job will be killed. The error message looks something like:
hive> set hive.exec.dynamic.partition.mode=nonstrict; hive> FROM page_view_stg pvs INSERT OVERWRITE TABLE page_view PARTITION(dt, country) SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null , null , pvs.ip, from_unixtimestamp(pvs.viewTime, ‘yyyy-MM-dd‘ ) ds, pvs.country; ... 2010 - 05 - 07 11 : 10 : 19 , 816 Stage- 1 map = 0 %, reduce = 0 % [Fatal Error] Operator FS_28 (id= 41 ): fatal error. Killing the job. Ended Job = job_201005052204_28178 with errors ... |
The problem of this that one mapper will take a random set of rows and it is very likely that the number of distinct (dt, country) pairs will exceed the limit of hive.exec.max.dynamic.partitions.pernode. One way around it is to group the rows by the dynamic partition columns in the mapper and distribute them to the reducers where the dynamic partitions will be created. In this case the number of distinct dynamic partitions will be significantly reduced. The above example query could be rewritten to:
hive> set hive.exec.dynamic.partition.mode=nonstrict; hive> FROM page_view_stg pvs INSERT OVERWRITE TABLE page_view PARTITION(dt, country) SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null , null , pvs.ip, from_unixtimestamp(pvs.viewTime, ‘yyyy-MM-dd‘ ) ds, pvs.country DISTRIBUTE BY ds, country; |
This query will generate a MapReduce job rather than Map-only job. The SELECT-clause will be converted to a plan to the mappers and the output will be distributed to the reducers based on the value of (ds, country) pairs. The INSERT-clause will be converted to the plan in the reducer which writes to the dynamic partitions.
Additional documentation:
In certain situations you would want to write the output into a local file so that you could load it into an excel spreadsheet. This can be accomplished with the following command:
INSERT OVERWRITE LOCAL DIRECTORY ‘/tmp/pv_gender_sum‘ SELECT pv_gender_sum.* FROM pv_gender_sum; |
The sampling clause allows the users to write queries for samples of the data instead of the whole table. Currently the sampling is done on the columns that are specified in the CLUSTERED BY clause of the CREATE TABLE statement. In the following example we choose 3rd bucket out of the 32 buckets of the pv_gender_sum table:
INSERT OVERWRITE TABLE pv_gender_sum_sample SELECT pv_gender_sum.* FROM pv_gender_sum TABLESAMPLE(BUCKET 3 OUT OF 32 ); |
In general the TABLESAMPLE syntax looks like:
y has to be a multiple or divisor of the number of buckets in that table as specified at the table creation time. The buckets chosen are determined if bucket_number module y is equal to x. So in the above example the following tablesample clause
would pick out the 3rd and 19th buckets. The buckets are numbered starting from 0.
On the other hand the tablesample clause
would pick out half of the 3rd bucket.
The language also supports union all, e.g. if we suppose there are two different tables that track which user has published a video and which user has published a comment, the following query joins the results of a union all with the user table to create a single annotated stream for all the video publishing and comment publishing events:
INSERT OVERWRITE TABLE actions_users SELECT u.id, actions.date FROM ( SELECT av.uid AS uid FROM action_video av WHERE av.date = ‘2008-06-03‘ UNION ALL SELECT ac.uid AS uid FROM action_comment ac WHERE ac.date = ‘2008-06-03‘ ) actions JOIN users u ON(u.id = actions.uid); |
Array columns in tables can be as follows:
CREATE TABLE array_table (int_array_column ARRAY< INT >); |
Assuming that pv.friends is of the type ARRAY<INT> (i.e. it is an array of integers), the user can get a specific element in the array by its index as shown in the following command:
SELECT pv.friends[ 2 ] FROM page_views pv; |
The select expression gets the third item in the pv.friends array.
The user can also get the length of the array using the size function as shown below:
SELECT pv.userid, size(pv.friends) FROM page_view pv; |
Maps provide collections similar to associative arrays. Such structures can only be created programmatically currently. We will be extending this soon. For the purpose of the current example assume that pv.properties is of the type map<String, String> i.e. it is an associative array from strings to string. Accordingly, the following query:
INSERT OVERWRITE page_views_map SELECT pv.userid, pv.properties[ ‘page type‘ ] FROM page_views pv; |
can be used to select the ‘page_type‘ property from the page_views table.
Similar to arrays, the size function can also be used to get the number of elements in a map as shown in the following query:
SELECT size(pv.properties) FROM page_view pv; |
Users can also plug in their own custom mappers and reducers in the data stream by using features natively supported in the Hive language. e.g. in order to run a custom mapper script - map_script - and a custom reducer script - reduce_script - the user can issue the following command which uses the TRANSFORM clause to embed the mapper and the reducer scripts.
Note that columns will be transformed to string and delimited by TAB before feeding to the user script, and the standard output of the user script will be treated as TAB-separated string columns. User scripts can output debug information to standard error which will be shown on the task detail page on hadoop.
FROM ( FROM pv_users MAP pv_users.userid, pv_users.date USING ‘map_script‘ AS dt, uid CLUSTER BY dt) map_output INSERT OVERWRITE TABLE pv_users_reduced REDUCE map_output.dt, map_output.uid USING ‘reduce_script‘ AS date, count; |
Sample map script (weekday_mapper.py )
import sys import datetime for line in sys.stdin: line = line.strip() userid, unixtime = line.split( ‘\t‘ ) weekday = datetime.datetime.fromtimestamp( float (unixtime)).isoweekday() print ‘,‘ .join([userid, str(weekday)]) |
Of course, both MAP and REDUCE are "syntactic sugar" for the more general select transform. The inner query could also have been written as such:
SELECT TRANSFORM(pv_users.userid, pv_users.date) USING ‘map_script‘ AS dt, uid CLUSTER BY dt FROM pv_users; |
Schema-less map/reduce: If there is no "AS" clause after "USING map_script", Hive assumes the output of the script contains 2 parts: key which is before the first tab, and value which is the rest after the first tab. Note that this is different from specifying "AS key, value" because in that case value will only contains the portion between the first tab and the second tab if there are multiple tabs.
In this way, we allow users to migrate old map/reduce scripts without knowing the schema of the map output. User still needs to know the reduce output schema because that has to match what is in the table that we are inserting to.
FROM ( FROM pv_users MAP pv_users.userid, pv_users.date USING ‘map_script‘ CLUSTER BY key) map_output INSERT OVERWRITE TABLE pv_users_reduced REDUCE map_output.dt, map_output.uid USING ‘reduce_script‘ AS date, count; |
Distribute By and Sort By: Instead of specifying "cluster by", the user can specify "distribute by" and "sort by", so the partition columns and sort columns can be different. The usual case is that the partition columns are a prefix of sort columns, but that is not required.
FROM ( FROM pv_users MAP pv_users.userid, pv_users.date USING ‘map_script‘ AS c1, c2, c3 DISTRIBUTE BY c2 SORT BY c2, c1) map_output INSERT OVERWRITE TABLE pv_users_reduced REDUCE map_output.c1, map_output.c2, map_output.c3 USING ‘reduce_script‘ AS date, count; |
Amongst the user community using map/reduce, cogroup is a fairly common operation wherein the data from multiple tables are sent to a custom reducer such that the rows are grouped by the values of certain columns on the tables. With the UNION ALL operator and the CLUSTER BY specification, this can be achieved in the Hive query language in the following way. Suppose we wanted to cogroup the rows from the actions_video and action_comments table on the uid column and send them to the ‘reduce_script‘ custom reducer, the following syntax can be used by the user:
FROM ( FROM ( FROM action_video av SELECT av.uid AS uid, av.id AS id, av.date AS date UNION ALL FROM action_comment ac SELECT ac.uid AS uid, ac.id AS id, ac.date AS date ) union_actions SELECT union_actions.uid, union_actions.id, union_actions.date CLUSTER BY union_actions.uid) map INSERT OVERWRITE TABLE actions_reduced SELECT TRANSFORM(map.uid, map.id, map.date) USING ‘reduce_script‘ AS (uid, id, reduced_val); |
