标签:azkaban int star header 2.4.1 指定 centos log for
由于每天都会产生大量的日志数据,需要对每天的日志进行加载与清洗以及转储,编写脚本文件后,通过azkaban进行调度即可。
加载数据到hive原生表,注意使用的动态分区,每天只加载前一天的数据。其中#号部分是需要替换成具体的时间值。
--load_data_to_hive_raw_logs.sql
use umeng_big11 ;
load data inpath ‘hdfs://mycluster/user/centos/umeng/raw _logs/${hiveconf:ym}/${hiveconf:day}/${hiveconf:hm}‘ into table raw_logs partition(ym=${hiveconf:ym},day=${hiveconf:day},hm=${hiveconf:hm}) ;
该脚本负责负责调用上面的sql脚本,调用前需要将参数进行填充。
[load_data_to_hive_raw_logs.sh]
#!/bin/bash
cd /home/centos/umeng
if [[ $# = 0 ]] ;
then time=`date -d "-3 minutes" "+%Y%m-%d-%H%M"` ;
else time=$1-$2-$3
fi
#external time variable
echo -n $time > _time
ym=`echo $time | awk -F ‘-‘ ‘{print $1}‘`
day=`echo $time | awk -F ‘-‘ ‘{print $2}‘`
hm=`echo $time | awk -F ‘-‘ ‘{print $3}‘`
hive -hiveconf ym=${ym} -hiveconf day=${day} -hiveconf hm=${hm} -f load_data_to_hive_raw_logs.sql
#增加执行权限
$>chmod +x load_data_to_hive_raw_logs.sh
#调用脚本,指定具体时间
$>./load_data_to_hive_raw_logs.sh 2018 2 4
#使用当前时间
$>./load_data_to_hive_raw_logs.sh
加载原生表的日志需要进行叉分,时间对齐以及地域信息处理分别转储到5张日志子表中。日志子表也都是分区表,因此查出来的数据需要动态指定分区表。
[fork_startuplogs.sql]
--startuplog,动态分区
use umeng_big11 ;
set hive.cli.print.header=true ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into startuplogs partition(ym,day,hm)
select
t.appChannel ,
t.appId ,
t.appPlatform ,
t.appVersion ,
t.brand ,
t.carrier ,
t.country ,
t.createdAtMs ,
t.deviceId ,
t.deviceStyle ,
t.ipAddress ,
t.network ,
t.osType ,
t.province ,
t.screenSize ,
t.tenantId ,
date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
date_format(cast(t.createdatms as timestamp) , ‘HHmm‘)
from
(
select
--动态传参叉分函数
${hiveconf:func}(servertimestr , clienttimems , clientip , log)
from
raw_logs
where
status = 200
and ym=${hiveconf:ym}
and day = ${hiveconf:day}
and hm = ${hiveconf:hm}
)t
shell脚本需要从_time文件中提取时间值,然后传入sql文件名和叉分函数名。
[fork_logs.sh]
#!/bin/bash
cd /home/centos/umeng
#取第一个参数作为叉分函数
func=${1}
time=`cat _time`
ym=`echo -n $time | awk -F ‘-‘ ‘{print $1}‘`
day=`echo -n $time | awk -F ‘-‘ ‘{print $2}‘`
hm=`echo -n $time | awk -F ‘-‘ ‘{print $3}‘`
hive -hiveconf ym=${ym} -hiveconf day=${day} -hiveconf hm=${hm} -hiveconf func=${2} -f ${1}
#指定叉分函数
$>./fork_logs.sh fork_startuplogs.sql forkstartuplogs
[fork_eventlogs.sql]
--eventlog,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into eventlogs partition(ym,day,hm)
select
t.appChannel ,
t.appId ,
t.appPlatform ,
t.appVersion ,
t.createdAtMs ,
t.deviceId ,
t.deviceStyle ,
t.eventDurationSecs,
t.eventId ,
t.osType ,
t.tenantId ,
date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
date_format(cast(t.createdatms as timestamp) , ‘HHmm‘)
from
(
select
--动态传参叉分函数
${hiveconf:func}(servertimestr , clienttimems , clientip , log)
from
raw_logs
where
status = 200
and ym=${hiveconf:ym} --年月
and day = ${hiveconf:day} --日
and hm = ${hiveconf:hm} --时分
)t
$>#指定叉分函数
$>./fork_logs.sh fork_eventlogs.sql forkeventlogs
[fork_errorlogs.sql]
--eventlog,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into errorlogs partition(ym,day,hm)
select
t.appChannel ,
t.appId ,
t.appPlatform ,
t.appVersion ,
t.createdAtMs ,
t.deviceId ,
t.deviceStyle ,
t.errorBrief ,
t.errorDetail ,
t.osType ,
t.tenantId ,
date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
date_format(cast(t.createdatms as timestamp) , ‘HHmm‘)
from
(
select
--动态传参叉分函数
${hiveconf:func}(servertimestr , clienttimems , clientip , log)
from
raw_logs
where
status = 200
and ym=${hiveconf:ym} --年月
and day = ${hiveconf:day} --日
and hm = ${hiveconf:hm} --时分
)t
$>#指定叉分函数
$>./fork_logs.sh fork_errorlogs.sql forkerrorlogs
[fork_usagelogs.sql]
--usagelogs,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into usagelogs partition(ym,day,hm)
select
t.appChannel ,
t.appId ,
t.appPlatform ,
t.appVersion ,
t.createdAtMs ,
t.deviceId ,
t.deviceStyle ,
t.osType ,
t.singleDownloadTraffic,
t.singleUploadTraffic ,
t.singleUseDurationSecs,
t.tenantId
date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
date_format(cast(t.createdatms as timestamp) , ‘HHmm‘)
from
(
select
--动态传参叉分函数
${hiveconf:func}(servertimestr , clienttimems , clientip , log)
from
raw_logs
where
status = 200
and ym=${hiveconf:ym}
and day = ${hiveconf:day}
and hm = ${hiveconf:hm}
)t
$>#指定叉分函数
$>./fork_logs.sh fork_usagelogs.sql forkusagelogs
[fork_pagelogs.sql]
--pagelog,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into pagelogs partition(ym,day,hm)
select
t.appChannel ,
t.appId ,
t.appPlatform ,
t.appVersion ,
t.createdAtMs ,
t.deviceId ,
t.deviceStyle ,
t.nextPage ,
t.osType ,
t.pageId ,
t.pageViewCntInSession,
t.stayDurationSecs ,
t.tenantId ,
t.visitIndex ,
date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
date_format(cast(t.createdatms as timestamp) , ‘HHmm‘)
from
(
select
--动态传参叉分函数
${hiveconf:func}(servertimestr , clienttimems , clientip , log)
from
raw_logs
where
status = 200
and ym=${hiveconf:ym}
and day = ${hiveconf:day}
and hm = ${hiveconf:hm}
)t
$>#指定叉分函数
$>./fork_logs.sh fork_pagelogs.sql forkpagelogs
编写5个sql文件,一个shell脚本文件,通过传递参数给执行脚本,动态执行每张日志表的叉分与转储工作。
标签:azkaban int star header 2.4.1 指定 centos log for
原文地址:https://www.cnblogs.com/xupccc/p/9565321.html