标签:
毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步
在新建了一个PyDev项目后,需要如下操作(拣最主要的写):
模块的环境变量:
# -*- coding:UTF-8 -*-
#!/usr/bin/python # FileName:pro_env.py
#*************************************************** # 项目的路径 PROJECT_DIR = "/usr/local/EclipseProjects/MyBI" # 项目配置文件的路径 PROJECT_CONF_DIR = PROJECT_DIR + "/conf/" # 项目第三方库的路径 PROJECT_LIB_DIR = PROJECT_DIR + "/lib" # 项目临时文件的路径 PROJECT_TMP_DIR = PROJECT_DIR + "/temp" #*************************************************** # Hadoop的安装路径 HADOOP_HOME = "/usr/local/hadoop/" # Hadoop的命令路径 HADOOP_PATH = HADOOP_HOME + "bin/" # HIVE的安装路径 HIVE_HOME = "/opt/hive-0.9.0/" # HIVE的命令路径 HIVE_PATH = HIVE_HOME + "bin/" # Sqoop的安装路径 SQOOP_HOME = "/opt/Sqoop/" # Sqoop的命令路径 SQOOP_PATH = SQOOP_HOME + "bin/" #*************************************************** # Java的安装路径 Java_HOME = "/usr/lib/jvm/jdk1.7.0_75"
配置文件:
导入模块的配置文件主要的目的是告诉Sqoop,导入哪些表,怎么导入,我暂时需要一张表,新建一个XML文件Import.xml,type="add"表示增量导入
<?xml version="1.0" encoding="UTF-8"?> <root> <task type="add"> <table>ModifyRecords</table> </task> </root>
需要对每张表进行更细一步的配置,新建ModifyRecords.xml
<?xml version="1.0" encoding="UTF-8"?> <root> <sqoop-shell type="import"> <param key="connect">jdbc:mysql://localhost:3306/Recommend</param> <param key="username">${username}</param> <param key="password">${password}</param> <param key="target-dir">/user/hadoop/Recommend/$dt</param> <param key="query">‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate$flag"\$CONDITIONS" and $CONDITIONS‘</param> <param key="m">1</param> <param key="fields-terminated-by">‘,‘</param> </sqoop-shell> </root>
剩下的工作就是解析配置文件:
# -*- coding:UTF-8 -*- #!/usr/bin/python # FileName:import.py from com.utls.pro_env import PROJECT_CONF_DIR import time from com.utls.sqoop import SqoopUtil import xml.etree.ElementTree as ET # 其中dt为昨天的日期,将由调度模块传入 def resolve_conf(dt): # 获得配置文件名 conf_file = PROJECT_CONF_DIR + "Import.xml" # 解析配置文件 xml_tree = ET.parse(conf_file) # 获得task元素 tasks = xml_tree.findall(‘./task‘) for task in tasks: # 获得导入类型,增量导入或者全量导入 import_type = task.attrib["type"] # 获得表名集合 tables = task.findall(‘./table‘) # 用来保存待执行的Sqoop命令的集合 cmds = [] # 迭代表名集合,解析表配置文件 for i in range(len(tables)): # 表名 table_name = tables[i].text # 表配置文件名 table_conf_file = PROJECT_CONF_DIR + table_name + ".xml" # 解析表配置文件 xmlTree = ET.parse(table_conf_file) # 获取sqoop-shell节点 sqoopNodes = xmlTree.findall("./sqoop-shell") # 获取sqoop-shell节点 sqoop_cmd_type = sqoopNodes[0].attrib["type"] # 获取 praNodes = sqoopNodes[0].findall("./param") # 用来保存param信息的字典 cmap = {} for i in range(len(praNodes)): # 获得key属性的值 key = praNodes[i].attrib["key"] # 获得param标签中间的值 value = praNodes[i].text # 保存到字典中 cmap[key] = value # 首先组装成sqoop命令头 command = "sqoop " + sqoop_cmd_type # 如果为全量导入 if(import_type == "all"): # query的查询条件为<dt import_condition = dt flag = "<" # 如果为增量导入 elif (import_type == "add"): # query的查询条件为=dt import_condition = dt flag = "=" else: raise Exception # #迭代字典将param的信息拼装成字符串 for key in cmap.keys(): value = cmap[key] # 如果不是键值对形式的命令选项 if(value == None or value == "" or value == " "): value = "" # 将query的CONDITIONS替换为查询条件 if(key == "query"): value = value.replace("\$CONDITIONS", import_condition) value = value.replace("$flag", flag) # 将导入分区替换为传入的时间 if(key == "target-dir"): value = value.replace("$dt", dt) # 拼装为命令 if key == "fields-terminated-by": command += " --" + key + " " + value else: command += " --" + key + " " + value + "\\" + "\n" # 将命令加入至待执行的命令集合 cmds.append(command) return cmds # Python模块的入口:main函数 if __name__ == ‘__main__‘: # 调度模块将昨天的时间传入 dt = time.strftime("%Y-%m-%d", time.localtime(time.time())) # 解析配置文件,获得sqoop命令集合 cmds = resolve_conf(dt) # 迭代集合,执行命令 for i in range(len(cmds)): cmd = cmds[i] # 执行导入过程 SqoopUtil.execute_shell(cmd)
拼装出来的命令如下:
sqoop import --username xxxx --target-dir /user/hadoop/Recommend/2015-04-26 --m 1 --connect jdbc:mysql://localhost:3306/Recommend --query ‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate="2015-04-26" and $CONDITIONS‘ --password xxxx --fields-terminated-by ‘,‘
最后新建一个模块(不过当然写在import.py的main函数之前...),编写一个类,为该类编写一个函数,目的是用Python调用Sqoop命令:
#!/usr/bin/python # FileName sqoop.py # -*- coding:UTF-8 -*- import os class SqoopUtil(object): ‘‘‘ sqoop operation ‘‘‘ def __init__(self): pass @staticmethod def execute_shell(shell): print shell os.system(shell)
标签:
原文地址:http://www.cnblogs.com/Murcielago/p/4457775.html