最初业务需求:将文件从ftp下到本地并上传到hdfs,其中ftp到本地和本地到hdfs皆有文件完整性校验,ftp到本地的逻辑由于和业务耦合度较高,因此本文不再叙述,这里仅说一下从本地到hdfs的并发脚本相关说明及代码实现。
测试环境: RHEL6.4 x86_64 / Hadoop 2.4.0.2.1.5.0-695
部分需求说明:
1、需要提供一个文件列表,以文件的形式,每行一个文件,所有文件有一个共同的父目录,且文件是有效存在的,当然,不存在脚本也会判断并记录的。
2、需要提供一个hdfs的父路径(绝对路径),此路径用于将本地文件的父路径替换,此hdfs路径需要执行脚本的用户有读写权限,当然,没有权限的话会报错并记录日志。
3、可以并发上传,可以设置并发个数,当然,会有最大个数限制(32,可直接修改相关变量)
4、可以检测是否有已传成功的,并忽略本地上传(重复大文件特别节省时间)
5、可以根据提供的带宽计算每个文件的上传超时时间,并记录日志以便于调试合适的执行
6、上传失败的要记录日志,并计入重试列表,可自动处理重试列表
7、不匹配的文件要记录日志,并放入无效文件列表
8、一个时间段内脚本只能在系统中有一个正在运行的
9、文件完整性校验通过对于文件的大小(未找到在hdfs上直接获取某个文件的md5的方法...)
10、暂时就想到这么多了...
部分逻辑说明:
1、文件上传超时时间公式: 文件大小/总带宽(默认5MB/s,)/并发个数+60
2、由于无法把函数单独放入后台执行,因此脚本分为控制脚本和上传脚本,用户使用控制脚本即可。
3、上传日志,上传列表文件,重试列表文件,上传线程文件都放在/var/log/ftp_op目录下
4、并发进程数必须为正整数数字,如果输入错误则报错(但乱码处理仍不太理想,算bug..)。并发进程数当大与32(ctrl_put.sh: 28行max_threads=32控制)时,则强制修改为32;当上传列表行数小于进程数时,则修改进程数为上传列表行数。
6、上某个线程超时,则将其需要上传的文件放入重试列表,并kill掉其进程,删除掉其标志文件
7、上传线程的标志文件命名为 前缀_线程id_时间戳_文件大小_超时时间
8、....不说了,代码中有注释.......
注:相关日志格式;如下
1) 上传成功日志格式: 时间、具体操作函数、hdfs文件路径、状态、文件大小、上传所用时间、分配的超时时间、有(check_size)表示此文件已存在,且对比大小一致,直接标价为成功;日志如下:
2)当要处理的文件的父路径中没有参数或变量定义的路径时,日志如下:
3)当上传时,在hdfs上无法创建路径,或者无法修改权限时,日志如下:
4) 当文件大小对比失败,或hdfs dfs -put命令执行失败时,日志如下:
5) 脚本中定义了其它相关报错日志,但由于笔者测试过程未出现相关报错,也无法一一列出
脚本
脚本使用说明
计划任务使用:
* * * * * /opt/ctrl_put.sh 10 /opt/upload_thread.sh /opt/localfiles /tmp/ftpfiles
# /opt/ctrl_put.sh 10 /opt/upload_thread.sh /opt/localfiles /tmp/ftpfiles
上述命令说明:
控制脚本: /opt/ctrl_put.sh
上传教程: /opt/upload_thread.sh
线程: 10
本地文件列表中文件的父目录: /opt/localfiles
要上传到hdfs的父目录: /tmp/ftpfiles
注: /opt/ctrl_put.sh脚本的第39行是文件上传列表变量;40行是重试列表变量;38行是无效文件列表;19行是log_dir的变量,此路径需要脚本执行者有所有权限
好吧,啰嗦了这么久,见代码如下
控制脚本: ctrl_put.sh
#!/bin/bash [ -x /bin/basename ] && bn_cmd=/bin/basename [ -x /usr/bin/basename ] && bn_cmd=/usr/bin/basename [ -x /usr/bin/dirname ] && dn_cmd=/usr/bin/dirname [ -x /usr/bin/wc ] && wc_cmd=/usr/bin/wc [ -x /usr/bin/uniq ] && uq_cmd=/usr/bin/uniq [ -x /usr/bin/hdfs ] && hdp_cmd="/usr/bin/hdfs dfs" # 检查是否有本脚本pid pid_file=/tmp/`$bn_cmd $0`_ftp_op.pid if [[ -f $pid_file ]];then ps -p `cat $pid_file` &> /dev/null [[ "$?" -eq "0" ]] && echo "`$log_date` : $0 exist." && exit 0 fi echo $$ > $pid_file log_date="/bin/date +%H:%M:%S/%Y-%m-%d" log_dir=/var/log/ftp_op log_file=$log_dir/ftp_op.log threads=${1:-10} thread_script=${2:-/opt/upload_thread.sh} #check_period=${5:-10} check_period=5 timestamp="/bin/date +%s" thread_file_pre=$log_dir/threadfile max_threads=32 # 5242880 = 5M/s network_speed=5242880 net_speed=`echo $network_speed $threads|awk ‘{printf("%.0lf",$1/$2)}‘` if [[ ! -d $log_dir ]];then mkdir -p $log_dir ; mkdir_res=$? [[ $mkdir_res -ne 0 ]] && echo "$log_dir : Can‘t create directory" && exit 1 fi put_invalid_list=$log_dir/put_hdfs_invalid.list put_hdfs_list=$log_dir/put_hdfs.list put_retry_list=$log_dir/retry_put.list final_dir=${3:-/storage/disk9/localfiles} hdfs_dir=${4:-/tmp/hdfs/files} # 日志记录函数 TEE(){ /usr/bin/tee -a $log_file } # 重试列表追加入当前列表 # 如果检测到重试列表不为空,追加进上传列表 # 此函需要两个参数 $1 $2 # $1 : 重试列表文件 # $2 : 标准处理列表 RETRY_LIST(){ if [ -f $1 ];then retry_sum=`cat $1|/usr/bin/wc -l` if [[ $retry_sum -ne 0 ]];then cat $1 >> $2 rm -rf $1 fi fi } # 线程个数策略,此函数需要提供两个参数 # $1 : 原始上传列表文件 # $2 : 用户提供的线程个数 THREAD_POLICY(){ if [[ $# -ne 2 ]];then echo "`$log_date` $FUNCNAME Error: \$# 1= 2" >> $log_file return 1 fi if [[ ! -f $1 ]];then echo "`$log_date` $FUNCNAME $1 No such file" >> $log_file return 2 fi echo "$2"|grep -q ‘^[-]\?[0-9]\+$‘ if [[ $? -ne 0 ]];then echo "`$log_date` $FUNCNAME $2 Invalid number" >> $log_file return 3 fi local list_sum=`cat $1|$wc_cmd -l` if [[ $list_sum -eq 0 ]];then #echo "`$log_date` $FUNCNAME $1 is empty" >> $log_file return 0 else if [[ $2 -ge $max_threads ]];then [[ "$list_sum" -le "$max_threads" ]] && echo $list_sum || echo $max_threads else [[ "$list_sum" -le "$2" ]] && echo $list_sum || echo $2 fi fi } # 超时失败处理,此函数需要提供一个参数 # $1 : 超时线程的pid标识文件 TIMEOUT_HANDLE(){ if [[ ! -f $1 ]];then echo "`$log_date` $FUNCNAME $1 no such file" >> $log_file return 1 fi local old_pid=`/usr/bin/tail -1 $1` ps -p $old_pid &> /dev/null if [[ $? -eq 0 ]];then kill $old_pid &> /dev/null if [[ $? -eq 0 ]];then sed -n "1p" $1 >> $put_retry_list rm -rf $1 return 0 else echo "`$log_date` $FUNCNAME $2 kill $old_pid fail." >> $log_file local file_dir=`$dn_cmd $1` ; local file_name=`$bn_cmd $1` sed -n "1p" $1 >> $put_retry_list mv -f $1 $file_dir/fail_kill_$file_name ps -p $old_pid &> /dev/null if [[ $? -eq 0 ]];then echo "`$log_date` $FUNCNAME $2 kill $old_pid fail." >> $log_file return 1 else return 0 fi fi else sed -n "1p" $1 >> $put_retry_list rm -rf $1 fi # $put_hdfs_list $put_retry_list $threads } # 创建线程执行脚本所需文件,此函数需要两个参数 # $1 : 线程执行脚本id号 # $2 : 要处理的具体文件的绝对路径 CREATE_THREAD_FILE(){ if [[ $# -ne 2 ]];then echo "`$log_date` $FUNCNAME Error \$#!=2" >> $log_file return 1 fi if [[ -z $1 ]];then echo "`$log_date` $FUNCNAME $1 is empty" >> $log_file return 0 fi if [[ ! -f $2 ]];then echo "`$log_date` $FUNCNAME $2 no such file" >> $log_file return 2 fi local file_size=`/usr/bin/du -b $2|awk ‘{print $1}‘` local time_out=`echo $file_size $net_speed|awk ‘{printf("%.0lf",$1/$2+60)}‘` local thread_file="$thread_file_pre"_"$1"_`$timestamp`_"$file_size"_"$time_out" echo $2 > $thread_file if [[ $? -eq 0 ]];then echo $thread_file return 0 else echo "`$log_date` $FUNCNAME $thread_file Can‘t create file" >> $log_file return 3 fi } # 超时策略,此函数需要提供两个参数 # $1 : 当前需要创建的线程个数id # $2 : 要处理文件的绝对路径 THREAD_FILE_POLICY(){ if [[ $# -ne 2 ]];then echo "`$log_date` $FUNCNAME Error \$#!=2" >> $log_file return 1 fi if [[ -z $1 ]];then echo "`$log_date` $FUNCNAME $1 is empty" >> $log_file return 0 fi if [[ ! -f $2 ]];then echo "`$log_date` $FUNCNAME $2 no such file" >> $log_file return 2 fi local old_file=`/bin/ls "$thread_file_pre"_"$1"_* 2> /dev/null` if [[ -f $old_file ]];then local now_time=`$timestamp` local old_time=`$bn_cmd $old_file|awk -F_ ‘{print $3}‘` local file_timeout=`$bn_cmd $old_file|awk -F_ ‘{print $NF}‘` local now_timeout=`echo $now_time $old_time|awk ‘{printf("%.0lf",$1-$2)}‘` if [[ $now_timeout -le $file_timeout ]];then return 0 else if TIMEOUT_HANDLE $old_file ;then echo `CREATE_THREAD_FILE $1 $2` fi fi else echo `CREATE_THREAD_FILE $1 $2` fi } # 主控进程函数 MASTER_CTRL(){ if [[ $# -ne 4 ]];then echo "`$log_date` $FUNCNAME Error \$#!=4" >> $log_file return 1 fi while :;do RETRY_LIST $2 $1 local final_threads=`THREAD_POLICY $1 $4` [[ -z $final_threads ]] && break for t in `/usr/bin/seq 1 $final_threads`;do local file_path=`sed -n "1p" $1` echo $file_path|grep -q $final_dir if [[ $? -ne 0 ]];then echo "`$log_date` $FUNCNAME $file_path invalid file" >> $log_file echo $file_path >> $put_invalid_list sed -i "1d" $1 continue fi local thread_file=`THREAD_FILE_POLICY $t $file_path` if [[ -f $thread_file ]];then /bin/bash $3 $thread_file $final_dir $hdfs_dir $2 & sed -i "1d" $1 fi done [[ ! -z $final_threads ]] && sleep $check_period done rm -rf $pid_file } MASTER_CTRL $put_hdfs_list $put_retry_list $thread_script $threads
上传线程脚本:upload_thread.sh
#!/bin/bash [[ ! -f $1 ]] && echo "Error, Invalid File" && exit 1 [[ ! -d $2 ]] && echo "Error, Invalid Directory" && exit 1 echo $$ >> $1 [ -x /bin/basename ] && bn_cmd=/bin/basename [ -x /usr/bin/basename ] && bn_cmd=/usr/bin/basename [ -x /usr/bin/dirname ] && dn_cmd=/usr/bin/dirname [ -x /usr/bin/wc ] && wc_cmd=/usr/bin/wc [ -x /usr/bin/uniq ] && uq_cmd=/usr/bin/uniq [ -x /usr/bin/hdfs ] && hdp_cmd="/usr/bin/hdfs dfs" [ -x /usr/bin/md5sum ] && ms_cmd=/usr/bin/md5sum log_date="/bin/date +%H:%M:%S/%Y-%m-%d" log_dir=/var/log/ftp_op log_file=$log_dir/ftp_op.log put_retry_list=${4:-$log_dir/retry_put.list} timestamp="/bin/date +%s" now_timestamp=`$timestamp` [ ! -d $log_dir ] && mkdir -p $log_dir # 日志记录函数 TEE(){ /usr/bin/tee -a $log_file } # 本地和hdfs的文件大小对比函数 # 此函数需要两个参数 $1 $2 # $1为本地文件大小 $2为hdfs文件路径 HDFS_SIZE_CHECK(){ if [[ $# -ne 2 ]];then echo "`$log_date` $FUNCNAME Error \$#!=2 \$1 or \$2 is empty"|TEE return 1 fi local hdfs_size=`$hdp_cmd -du $2|awk ‘{print $1}‘` [[ $1 -eq $hdfs_size ]] && return 0 || return 1 } # 此函数需要三个参数 # $1 : hdfs的文件名 # $2 : 本地的对应文件的大小 # $3 : hdfs文件的目录 HDFS_LOCATION_CHECK(){ if [[ $# -ne 3 ]];then return 2 fi if $hdp_cmd -test -d $3 ;then if $hdp_cmd -test -f $1 ;then if HDFS_SIZE_CHECK $2 $1 ;then $hdp_cmd -rm -r -f -skipTrash $1.tmp return 1 else $hdp_cmd -rm -r -f -skipTrash $1 fi fi if $hdp_cmd -test -f $1.tmp ;then if HDFS_SIZE_CHECK $2 $1.tmp ;then $hdp_cmd -mv $1.tmp $1 return 1 else $hdp_cmd -rm -r -f -skipTrash $1.tmp return 0 fi else return 0 fi else if $hdp_cmd -mkdir -p $3 ;then $hdp_cmd -chmod 777 $3 && return 0 || return 4 else return 3 fi fi } # 此函数仅作上传处理,此函数需要五个参数 # $1 需要上传的本地文件 # $2 要上传到hdfs的目标文件 # $3 本地文件的大小byte # $4 分配的超时时间 # $5 本地文件的du -sh的统计大小 ONLY_UPLOAD(){ if [[ $# -ne 5 ]];then echo "`$log_date` $FUNCNAME Error: \$# != 5"|TEE return 1 fi if [[ ! -f $1 ]];then echo "`$log_date` $FUNCNAME Error: \$1=$1 no such file"|TEE return 1 fi $hdp_cmd -put -f $1 $2.tmp &> /dev/null if HDFS_SIZE_CHECK $3 $2.tmp ;then $hdp_cmd -mv $2.tmp $2 &> /dev/null local nowtime=`$timestamp` ; local costtime=`/usr/bin/expr $nowtime - $now_timestamp` echo "`$log_date` $FUNCNAME $2 Upload Success $5 $costtime $4" >> $log_file return 0 else $hdp_cmd -rm -r -f -skipTrash $2.tmp &> /dev/null return 1 fi } # 上传HDFS PUT_TO_HDFS(){ if [[ $# -ne 3 ]];then echo "`$log_date` $FUNCNAME Error: \$# 1= 3"|TEE return 1 elif [[ ! -f $1 ]];then echo "`$log_date` $FUNCNAME Error: \$1 Invalid File"|TEE return 1 elif [[ ! -d $2 ]];then echo "`$log_date` $FUNCNAME Error: \$2 Invalid Directory"|TEE return 1 elif [[ -z $3 ]];then echo "`$log_date` $FUNCNAME Error: \$3 is Empty"|TEE return 1 fi local list_sum=`cat $1|$wc_cmd -l` if [[ $list_sum -ne 2 ]];then echo "`$log_date` $FUNCNAME $1 is invalid pidfile"|TEE return 2 fi local local_file=`sed -n "1p" $1` local local_size=`$bn_cmd $1|awk -F_ ‘{print $4}‘` local hdfs_file=`echo $local_file|sed "s@$2@$3@1"` local hdfs_dir=`/usr/bin/dirname $hdfs_file` local valid_time=`$bn_cmd $1|awk -F_ ‘{print $NF}‘` local filesize=`/usr/bin/du -sh $local_file|awk ‘{print $1}‘` HDFS_LOCATION_CHECK $hdfs_file $local_size $hdfs_dir ; hlc_rev=$? local nowtime=`$timestamp` local costtime=`/usr/bin/expr $nowtime - $now_timestamp` case $hlc_rev in 0) ONLY_UPLOAD $local_file $hdfs_file $local_size $valid_time $filesize if [[ $? -ne 0 ]] ;then sed -n "1p" $1 >> $put_retry_list local nowtime=`$timestamp` ; local costtime=`/usr/bin/expr $nowtime - $now_timestamp` echo "`$log_date` ONLY_UPLOAD Upload Failed $filesize $costtime $valid_time" >> $log_file fi ;; 1) echo "`$log_date` $FUNCNAME $hdfs_file Upload Success $filesize $costtime $valid_time (check size)" >> $log_file ;; 2) sed -n "1p" $1 >> $put_retry_list echo "`$log_date` HDFS_LOCATION_CHECK Upload Failed: \$# != 2 $filesize $costtime $valid_time" >> $log_file ;; 3) sed -n "1p" $1 >> $put_retry_list echo "`$log_date` HDFS_LOCATION_CHECK Upload Failed: Can‘t create directory -> $hdfs_dir $filesize $costtime $valid_time" >> $log_file ;; 4) sed -n "1p" $1 >> $put_retry_list echo "`$log_date` HDFS_LOCATION_CHECK Upload Failed: Can‘t chmod 777 $hdfs_dir on the hdfs $filesize $costtime $valid_time" >> $log_file ;; esac rm -rf $1 } PUT_TO_HDFS $1 $2 $3
如果觉得代码复制麻烦,附件中提供了代码文件...转载请注明出处!谢谢!
我擦!!多上传的附件,后续编辑时不能删?还是我没找到....
本文出自 “自强不息” 博客,请务必保留此出处http://mos1989.blog.51cto.com/4226977/1589807
原文地址:http://mos1989.blog.51cto.com/4226977/1589807