码迷,mamicode.com
首页 > 其他好文 > 详细

脚本化加载文件与转储

时间:2018-08-31 14:10:01      阅读:170      评论:0      收藏:0      [点我收藏+]

标签:azkaban   int   star   header   2.4.1   指定   centos   log   for   

脚本化加载文件与转储

1、加载数据到原生表

1.1 介绍

由于每天都会产生大量的日志数据,需要对每天的日志进行加载与清洗以及转储,编写脚本文件后,通过azkaban进行调度即可。

1.2 编写load_data_to_hive_raw_logs.sql

加载数据到hive原生表,注意使用的动态分区,每天只加载前一天的数据。其中#号部分是需要替换成具体的时间值。

--load_data_to_hive_raw_logs.sql
use umeng_big11 ;
load data inpath ‘hdfs://mycluster/user/centos/umeng/raw _logs/${hiveconf:ym}/${hiveconf:day}/${hiveconf:hm}‘ into table raw_logs partition(ym=${hiveconf:ym},day=${hiveconf:day},hm=${hiveconf:hm}) ;

1.3 编写load_data_to_hive_raw_logs.sh

该脚本负责负责调用上面的sql脚本,调用前需要将参数进行填充。

[load_data_to_hive_raw_logs.sh]
#!/bin/bash
cd /home/centos/umeng
if [[ $# = 0 ]] ;
then time=`date -d "-3 minutes" "+%Y%m-%d-%H%M"` ;
else time=$1-$2-$3
fi

#external time variable
echo -n $time > _time

ym=`echo $time | awk -F ‘-‘ ‘{print $1}‘`
day=`echo $time | awk -F ‘-‘ ‘{print $2}‘`
hm=`echo $time | awk -F ‘-‘ ‘{print $3}‘`

hive -hiveconf ym=${ym} -hiveconf day=${day} -hiveconf hm=${hm} -f load_data_to_hive_raw_logs.sql

1.4 修改脚本权限并执行

#增加执行权限
$>chmod +x load_data_to_hive_raw_logs.sh

#调用脚本,指定具体时间
$>./load_data_to_hive_raw_logs.sh 2018 2 4

#使用当前时间
$>./load_data_to_hive_raw_logs.sh

2、叉分并转储

2.1 说明

加载原生表的日志需要进行叉分,时间对齐以及地域信息处理分别转储到5张日志子表中。日志子表也都是分区表,因此查出来的数据需要动态指定分区表。

2.2 叉分startuplogs表

2.2.1 编写sql文件

[fork_startuplogs.sql]

--startuplog,动态分区
use umeng_big11 ;
set hive.cli.print.header=true ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into startuplogs partition(ym,day,hm) 
select
  t.appChannel  ,
  t.appId       ,
  t.appPlatform ,
  t.appVersion  ,
  t.brand       ,
  t.carrier     ,
  t.country     ,
  t.createdAtMs ,
  t.deviceId    ,
  t.deviceStyle ,
  t.ipAddress   ,
  t.network     ,
  t.osType      ,
  t.province    ,
  t.screenSize  ,
  t.tenantId    , 
  date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘HHmm‘) 
from
  (
    select 
      --动态传参叉分函数
      ${hiveconf:func}(servertimestr , clienttimems , clientip , log)
    from 
      raw_logs 
    where 
      status = 200
      and ym=${hiveconf:ym}
      and day = ${hiveconf:day}
      and hm = ${hiveconf:hm}
  )t
2.2.2 编写sh文件

shell脚本需要从_time文件中提取时间值,然后传入sql文件名和叉分函数名。

[fork_logs.sh]

#!/bin/bash
cd /home/centos/umeng
#取第一个参数作为叉分函数
func=${1}
time=`cat _time`
ym=`echo -n $time | awk -F ‘-‘ ‘{print $1}‘`
day=`echo -n $time | awk -F ‘-‘ ‘{print $2}‘`
hm=`echo -n $time | awk -F ‘-‘ ‘{print $3}‘`

hive -hiveconf ym=${ym} -hiveconf day=${day} -hiveconf hm=${hm} -hiveconf func=${2} -f ${1}
2.2.3 执行脚本
#指定叉分函数
$>./fork_logs.sh fork_startuplogs.sql forkstartuplogs

2.3 叉分eventlogs表

2.3.1 编写sql

[fork_eventlogs.sql]

--eventlog,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into eventlogs partition(ym,day,hm) 
select
  t.appChannel      ,
  t.appId           ,
  t.appPlatform     ,
  t.appVersion      ,   
  t.createdAtMs     ,
  t.deviceId        ,   
  t.deviceStyle     ,
  t.eventDurationSecs, 
  t.eventId         ,
  t.osType          ,
  t.tenantId        ,   
  date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘HHmm‘) 
from
  (
    select 
      --动态传参叉分函数
      ${hiveconf:func}(servertimestr , clienttimems , clientip , log)
    from 
      raw_logs 
    where 
      status = 200
      and ym=${hiveconf:ym}         --年月
      and day = ${hiveconf:day} --日
      and hm = ${hiveconf:hm}   --时分
  )t
2.3.2 执行脚本
$>#指定叉分函数
$>./fork_logs.sh fork_eventlogs.sql forkeventlogs

2.4 叉分errorlogs表

2.4.1 编写sql

[fork_errorlogs.sql]

--eventlog,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into errorlogs partition(ym,day,hm) 
select
  t.appChannel  ,
  t.appId       ,
  t.appPlatform ,
  t.appVersion  ,
  t.createdAtMs ,
  t.deviceId    ,
  t.deviceStyle ,
  t.errorBrief  ,
  t.errorDetail ,
  t.osType      ,
  t.tenantId    ,
  date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘HHmm‘) 
from
  (
    select 
      --动态传参叉分函数
      ${hiveconf:func}(servertimestr , clienttimems , clientip , log)
    from 
      raw_logs 
    where 
      status = 200
      and ym=${hiveconf:ym}         --年月
      and day = ${hiveconf:day} --日
      and hm = ${hiveconf:hm}   --时分
  )t
2.4.2 执行脚本
$>#指定叉分函数
$>./fork_logs.sh fork_errorlogs.sql forkerrorlogs

2.5 叉分usagelogs表

2.5.1 编写sql

[fork_usagelogs.sql]

--usagelogs,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into usagelogs partition(ym,day,hm) 
select
  t.appChannel          ,
  t.appId               ,
  t.appPlatform         ,
  t.appVersion          ,
  t.createdAtMs         ,
  t.deviceId            ,
  t.deviceStyle         ,
  t.osType              ,
  t.singleDownloadTraffic,
  t.singleUploadTraffic ,
  t.singleUseDurationSecs,
  t.tenantId 
  date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘HHmm‘) 
from
  (
    select 
      --动态传参叉分函数
      ${hiveconf:func}(servertimestr , clienttimems , clientip , log)
    from 
      raw_logs 
    where
      status = 200
      and ym=${hiveconf:ym}
      and day = ${hiveconf:day}
      and hm = ${hiveconf:hm}
  )t
2.5.2 执行脚本
$>#指定叉分函数
$>./fork_logs.sh fork_usagelogs.sql forkusagelogs

2.6 叉分pagelogs表

2.6.1 编写sql

[fork_pagelogs.sql]

--pagelog,动态分区
use umeng_big11 ;
add jar /soft/hive/lib/umeng_hive.jar ;
create temporary function forkstartuplogs as ‘com.oldboy.umeng.hive.udf.ForkStartupLogUDTF‘ ;
create temporary function forkeventlogs as ‘com.oldboy.umeng.hive.udf.ForkEventLogUDTF‘ ;
create temporary function forkerrorlogs as ‘com.oldboy.umeng.hive.udf.ForkErrorLogUDTF‘ ;
create temporary function forkusagelogs as ‘com.oldboy.umeng.hive.udf.ForkUsageLogUDTF‘ ;
create temporary function forkpagelogs as ‘com.oldboy.umeng.hive.udf.ForkPageLogUDTF‘ ;
insert into pagelogs partition(ym,day,hm) 
select
  t.appChannel          ,
  t.appId               ,
  t.appPlatform         ,
  t.appVersion          ,
  t.createdAtMs         ,
  t.deviceId            ,
  t.deviceStyle         ,
  t.nextPage            ,
  t.osType              ,
  t.pageId              ,
  t.pageViewCntInSession,
  t.stayDurationSecs    ,
  t.tenantId            ,
  t.visitIndex          ,
  date_format(cast(t.createdatms as timestamp) , ‘yyyyMM‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘dd‘) ,
  date_format(cast(t.createdatms as timestamp) , ‘HHmm‘) 
from
  (
    select 
      --动态传参叉分函数
      ${hiveconf:func}(servertimestr , clienttimems , clientip , log)
    from 
      raw_logs 
    where 
      status = 200
      and ym=${hiveconf:ym}
      and day = ${hiveconf:day}
      and hm = ${hiveconf:hm}
  )t
2.6.2 执行脚本
$>#指定叉分函数
$>./fork_logs.sh fork_pagelogs.sql forkpagelogs

3、总结

编写5个sql文件,一个shell脚本文件,通过传递参数给执行脚本,动态执行每张日志表的叉分与转储工作。

脚本化加载文件与转储

标签:azkaban   int   star   header   2.4.1   指定   centos   log   for   

原文地址:https://www.cnblogs.com/xupccc/p/9565321.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!