码迷,mamicode.com
首页 > 其他好文 > 详细

全量脚本

时间:2018-06-21 22:27:50      阅读:542      评论:0      收藏:0      [点我收藏+]

标签:person   查询   sys.path   cal   now()   bre   gre   字段   duti   

import sys

sys.path.append("..")
sys.path.append("/usr/local/software/ELK")
from Util.DateEncoder import *
from Util.MySQLHelper import *
from kafka import KafkaProducer
from Util.TextUtil import *
import os
from Util.GetAreaPartionIdUtil import *
# 黄海定义的输出信息的办法,带当前时间
def logInfo(msg):
    i = datetime.datetime.now()
    print(" %s            %s" % (i, msg))

# pip install kafka-python

# 区域码
AreaCode = mysql_AreaCode
# 每次读取的个数
BatchSize = 500

# 检查是内网还是外网
def CheckInOut():
    if mysql_AreaCode==aly:
        return  True
    else:
        return False


if __name__ == __main__:
    # 判断是不是阿里云的内网,因为阿里云的资源ID要从1开始,而其它地区的要从100000000开始
    if not CheckInOut():  # 如果不是阿里云,就是其它地区
        # 资源
        processFileName = /usr/local/software/ELK/Progress/t_resource_info.log
        logInfo(系统检测到不是阿里云主机,资源编号应该从100000000开始!)
        if not os.path.exists(processFileName):  # 并且进度文件不存在
            processId = 100000000
            WriteContent(processFileName, processId)
            logInfo(资源进度文件不存在,已成功写入100000000!)
        else:
            logInfo(资源进度文件已存在!)

        # 试题
        processFileName =  /usr/local/software/ELK/Progress/t_tk_question_info.log
        logInfo(系统检测到不是阿里云主机,试题编号应该从100000000开始!)
        if not os.path.exists(processFileName):  # 并且进度文件不存在
            processId = 100000000
            WriteContent(processFileName, processId)
            logInfo(试题进度文件不存在,已成功写入100000000!)
        else:
            logInfo(试题进度文件已存在!)

        # 试卷
        processFileName = /usr/local/software/ELK/Progress/t_sjk_paper_info.log
        logInfo(系统检测到不是阿里云主机,试卷编号应该从100000000开始!)
        if not os.path.exists(processFileName):  # 并且进度文件不存在
            processId = 100000000
            WriteContent(processFileName, processId)
            logInfo(试卷进度文件不存在,已成功写入100000000!)
        else:
            logInfo(试卷进度文件已存在!)

        # 微课
        processFileName = /usr/local/software/ELK/Progress/t_wkds_info.log
        logInfo(系统检测到不是阿里云主机,微课编号应该从100000000开始!)
        if not os.path.exists(processFileName):  # 并且进度文件不存在
            processId = 100000000
            WriteContent(processFileName, processId)
            logInfo(微课进度文件不存在,已成功写入100000000!)
        else:
            logInfo(微课进度文件已存在!)

    logInfo(开始获取主机对应的PartitionId...)
    partitionId=GetAreaPartitionId()
    logInfo(成功获取主机的对应PartitionId=+str(partitionId))

    # 开启数据库链接
    db = MySQLHelper()

    # 声明Kafka生产者
    producer = KafkaProducer(bootstrap_servers=kafka_servers)
    # 统一的topic名称
    topicName = dsideal_db
    OrderList = [t_dm_stage, t_dm_subject, t_resource_scheme, t_resource_structure, t_base_organization,
                 t_base_class, t_base_person, t_base_student, t_base_parent, t_resource_info,
                 t_social_course_restype_rela, t_hfsz_action_case, t_hfsz_yj_lib, t_base_review,
                 t_sjk_paper_info, t_tk_question_info, t_wkds_info]
    for i in range(len(OrderList)):
        # 判断一下这个MYSQL表是不是存在?
        sql="select table_name from information_schema.tables where table_name =‘%s‘"%(OrderList[i])
        dt=db.query(sql)
        if len(dt)==0:
            logInfo(""+OrderList[i]+"并不存在,将不进行此表的数据上报!")
            continue

        # 进度号
        processId = 0
        processFileName = /usr/local/software/ELK/Progress/ + OrderList[i]+.log
        if os.path.exists(processFileName):
            processId = int(ReadContent(processFileName))

        count = 0
        # 判断文件是不是存在!
        if not os.path.exists( /usr/local/software/ELK/Sql/ + OrderList[i]+".json"):
            logInfo(配置SQL文件: + processFileName + "不存在,将不上报此文档内容!")
            continue

        while True:
            # 读取sql配置文件
            jsonStr = ReadContent( /usr/local/software/ELK/Sql/ + OrderList[i]+".json")
            obj = json.loads(jsonStr)
            # 主键
            pk = obj[pk]
            # sql语句
            sql = str(obj[sql]).replace("#area_code#", AreaCode).replace("#pk#", pk).replace("#id#", str(
                processId)).replace("#limit#"," limit " + str(BatchSize))

            # 查询数据
            dt = db.query(sql.lower())

            # 将字段大写转为小写
            for row in dt:
                new_dics = {}
                for k, v in row.items():
                    new_dics[k.lower()] = v

                # 向Kafka 发送数据
                jstr = json.dumps(new_dics, cls=DateEncoder, ensure_ascii=False)
                # 如果kafka超时的话,注意要检查一下是不是手工事先创建了Topic!!!!
                producer.send(topic=topicName,partition=partitionId,value= jstr.encode(utf-8))

            if(len(dt)>0):
                # 批量提交一下
                producer.flush()
                # 计算进度
                processId = dt[len(dt) - 1][id].replace(AreaCode + "_", "")
                # 写入配置文件
                WriteContent(processFileName, processId)
                count = count + len(dt)
                logInfo("成功处理" + OrderList[i] +   + str(count) + "个!当前最大ID:" + str(processId))
                # 如果这次不满批量个数,也就是没有更多了!
                if len(dt)<BatchSize:
                    break
            else:
                # 结束就退出
                break
    # 关闭数据库链接
    db.close()
    # 关闭Kafka
    producer.close()
    logInfo(恭喜,所有任务成功完成!)

nohup /usr/bin/python3 -u /usr/local/software/ELK/PutDataToKafkaAll.py >>all.log 2>&1 &

 

0 0 * * * root  nohup /usr/bin/python3 -u /usr/local/software/ELK/PutDataToKafkaAll.py >>all.log 2>&1 &

全量脚本

标签:person   查询   sys.path   cal   now()   bre   gre   字段   duti   

原文地址:https://www.cnblogs.com/littlehb/p/9210927.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!