标签:person 查询 sys.path cal now() bre gre 字段 duti
import sys sys.path.append("..") sys.path.append("/usr/local/software/ELK") from Util.DateEncoder import * from Util.MySQLHelper import * from kafka import KafkaProducer from Util.TextUtil import * import os from Util.GetAreaPartionIdUtil import * # 黄海定义的输出信息的办法,带当前时间 def logInfo(msg): i = datetime.datetime.now() print(" %s %s" % (i, msg)) # pip install kafka-python # 区域码 AreaCode = mysql_AreaCode # 每次读取的个数 BatchSize = 500 # 检查是内网还是外网 def CheckInOut(): if mysql_AreaCode==‘aly‘: return True else: return False if __name__ == ‘__main__‘: # 判断是不是阿里云的内网,因为阿里云的资源ID要从1开始,而其它地区的要从100000000开始 if not CheckInOut(): # 如果不是阿里云,就是其它地区 # 资源 processFileName = ‘/usr/local/software/ELK/Progress/t_resource_info.log‘ logInfo(‘系统检测到不是阿里云主机,资源编号应该从100000000开始!‘) if not os.path.exists(processFileName): # 并且进度文件不存在 processId = ‘100000000‘ WriteContent(processFileName, processId) logInfo(‘资源进度文件不存在,已成功写入100000000!‘) else: logInfo(‘资源进度文件已存在!‘) # 试题 processFileName = ‘/usr/local/software/ELK/Progress/t_tk_question_info.log‘ logInfo(‘系统检测到不是阿里云主机,试题编号应该从100000000开始!‘) if not os.path.exists(processFileName): # 并且进度文件不存在 processId = ‘100000000‘ WriteContent(processFileName, processId) logInfo(‘试题进度文件不存在,已成功写入100000000!‘) else: logInfo(‘试题进度文件已存在!‘) # 试卷 processFileName = ‘/usr/local/software/ELK/Progress/t_sjk_paper_info.log‘ logInfo(‘系统检测到不是阿里云主机,试卷编号应该从100000000开始!‘) if not os.path.exists(processFileName): # 并且进度文件不存在 processId = ‘100000000‘ WriteContent(processFileName, processId) logInfo(‘试卷进度文件不存在,已成功写入100000000!‘) else: logInfo(‘试卷进度文件已存在!‘) # 微课 processFileName = ‘/usr/local/software/ELK/Progress/t_wkds_info.log‘ logInfo(‘系统检测到不是阿里云主机,微课编号应该从100000000开始!‘) if not os.path.exists(processFileName): # 并且进度文件不存在 processId = ‘100000000‘ WriteContent(processFileName, processId) logInfo(‘微课进度文件不存在,已成功写入100000000!‘) else: logInfo(‘微课进度文件已存在!‘) logInfo(‘开始获取主机对应的PartitionId...‘) partitionId=GetAreaPartitionId() logInfo(‘成功获取主机的对应PartitionId=‘+str(partitionId)) # 开启数据库链接 db = MySQLHelper() # 声明Kafka生产者 producer = KafkaProducer(bootstrap_servers=kafka_servers) # 统一的topic名称 topicName = ‘dsideal_db‘ OrderList = [‘t_dm_stage‘, ‘t_dm_subject‘, ‘t_resource_scheme‘, ‘t_resource_structure‘, ‘t_base_organization‘, ‘t_base_class‘, ‘t_base_person‘, ‘t_base_student‘, ‘t_base_parent‘, ‘t_resource_info‘, ‘t_social_course_restype_rela‘, ‘t_hfsz_action_case‘, ‘t_hfsz_yj_lib‘, ‘t_base_review‘, ‘t_sjk_paper_info‘, ‘t_tk_question_info‘, ‘t_wkds_info‘] for i in range(len(OrderList)): # 判断一下这个MYSQL表是不是存在? sql="select table_name from information_schema.tables where table_name =‘%s‘"%(OrderList[i]) dt=db.query(sql) if len(dt)==0: logInfo("表"+OrderList[i]+"并不存在,将不进行此表的数据上报!") continue # 进度号 processId = 0 processFileName = ‘/usr/local/software/ELK/Progress/‘ + OrderList[i]+‘.log‘ if os.path.exists(processFileName): processId = int(ReadContent(processFileName)) count = 0 # 判断文件是不是存在! if not os.path.exists( ‘/usr/local/software/ELK/Sql/‘ + OrderList[i]+".json"): logInfo(‘配置SQL文件:‘ + processFileName + "不存在,将不上报此文档内容!") continue while True: # 读取sql配置文件 jsonStr = ReadContent( ‘/usr/local/software/ELK/Sql/‘ + OrderList[i]+".json") obj = json.loads(jsonStr) # 主键 pk = obj[‘pk‘] # sql语句 sql = str(obj[‘sql‘]).replace("#area_code#", AreaCode).replace("#pk#", pk).replace("#id#", str( processId)).replace("#limit#"," limit " + str(BatchSize)) # 查询数据 dt = db.query(sql.lower()) # 将字段大写转为小写 for row in dt: new_dics = {} for k, v in row.items(): new_dics[k.lower()] = v # 向Kafka 发送数据 jstr = json.dumps(new_dics, cls=DateEncoder, ensure_ascii=False) # 如果kafka超时的话,注意要检查一下是不是手工事先创建了Topic!!!! producer.send(topic=topicName,partition=partitionId,value= jstr.encode(‘utf-8‘)) if(len(dt)>0): # 批量提交一下 producer.flush() # 计算进度 processId = dt[len(dt) - 1][‘id‘].replace(AreaCode + "_", "") # 写入配置文件 WriteContent(processFileName, processId) count = count + len(dt) logInfo("成功处理" + OrderList[i] + ‘ ‘ + str(count) + "个!当前最大ID:" + str(processId)) # 如果这次不满批量个数,也就是没有更多了! if len(dt)<BatchSize: break else: # 结束就退出 break # 关闭数据库链接 db.close() # 关闭Kafka producer.close() logInfo(‘恭喜,所有任务成功完成!‘)
nohup /usr/bin/python3 -u /usr/local/software/ELK/PutDataToKafkaAll.py >>all.log 2>&1 &
0 0 * * * root nohup /usr/bin/python3 -u /usr/local/software/ELK/PutDataToKafkaAll.py >>all.log 2>&1 &
标签:person 查询 sys.path cal now() bre gre 字段 duti
原文地址:https://www.cnblogs.com/littlehb/p/9210927.html