码迷,mamicode.com
首页 > 其他好文 > 详细

pika的阻塞式使用

时间:2018-06-15 20:57:27      阅读:298      评论:0      收藏:0      [点我收藏+]

标签:update   开始   python   param   live   cal   sum   json   mysql数据库   

import os
import sys

# pip install kafka-python
sys.path.append("/usr/local/software/ELK")
from Util.DateEncoder import *
from kafka import KafkaProducer
from Util.TextUtil import *
from Util.MySQLHelper import *
from Util.GetAreaPartionIdUtil import *
from Util.RabbitMqUtil import *

# 区域码
AreaCode = mysql_AreaCode
MemoryDict = []


# 处理某张表中ID的数据变更
def doAction(sql):
    while True:
        try:
            dt = db.query(sql)
            if len(dt) > 0:
                # 将字段大写转为小写
                for row in dt:
                    new_dics = {}
                    for k, v in row.items():
                        new_dics[k.lower()] = v
                jstr = json.dumps(new_dics, cls=DateEncoder)
                producer.send(topic=topicName, partition=partitionId, value=jstr.encode(utf-8))
                # 提交一下
                producer.flush()
                break
        except Exception as err:
            logInfo(是不是没有连接上kafka??将休息3秒...)
            time.sleep(3)


# 通过表名,找到对应的配置文件,读取这个表的PK是什么
def GetPk(table):
    file = os.getcwd().replace(\\, /) + /Sql/ + table + .json
    if os.path.exists(file):
        jsonStr = ReadContent(file)
        obj = json.loads(jsonStr)
        # 主键
        pk = obj[pk]
        return pk
    else:
        logInfo(文件: + file + "不存在,程序无法继续!")
        sys.exit()


# 通过表名和PK的真实值,拼接出SQL语句
def GetSql(table, id):
    file = os.getcwd().replace(\\, /) + /Sql/ + table + .json
    if os.path.exists(file):
        jsonStr = ReadContent(file)
        obj = json.loads(jsonStr)
        # 主键
        pk = obj[pk]
        sql = str(obj[sql]).replace(>, =)
        sql = sql.replace("#area_code#", AreaCode).replace("order by t1.#pk#", "").replace("order by t2.#pk#",
                                                                                           "").replace(
            "order by t3.#pk#", "").replace("order by t4.#pk#", "").replace("order by #pk#", "").replace("#pk#",
                                                                                                         pk).replace(
            "#id#", str(id)).replace("#limit#", "")
        return sql
    else:
        logInfo(文件: + file + "不存在,程序无法继续!")
        sys.exit()


# 解析rabbitmq中的json数据,知道当前变化的是哪个ID
def RabbitMqId(dataJson, pk):
    if dataJson[event] == insert:
        return int(dataJson[columns][pk][value])
    elif dataJson[event] == update:
        return int(dataJson[before][columns][pk][value])
    else:
        logInfo(不是insert也不是update,这是不行的,程序无法执行!)
        return 0


# 黄海定义的输出信息的办法,带当前时间
def logInfo(msg):
    i = datetime.datetime.now()
    print(" %s            %s" % (i, msg))


if __name__ == __main__:
    logInfo(开始获取主机对应的PartitionId...)
    partitionId = GetAreaPartitionId()
    logInfo(成功获取主机的对应PartitionId= + str(partitionId))
    # 队列名
    queue_Name = kafka_queue
    # 交换机名
    switch_Name = elk_switch
    # 声明mysql数据库
    db = MySQLHelper()

    while True:
        try:
            # 声明Kafka生产者
            producer = KafkaProducer(bootstrap_servers=kafka_servers)
            # 统一的topic名称
            topicName = dsideal_db
            break
        except:
            time.sleep(3)
            logInfo(没有正确连接到Kafka,3秒后将重试...)

    while True:
        try:
            # 准备连接到Rabbitmq
            logInfo("正在连接到RabbitMQ...")
            credentials = pika.PlainCredentials(RabbitMq_User, RabbitMq_Password)
            connection = pika.BlockingConnection(pika.ConnectionParameters(RabbitMq_IP, int(RabbitMq_Port), /, credentials))
            channel = connection.channel()
            logInfo("成功连接到RabbitMQ!正在阻塞等待消息..")
            for method_frame, properties, body in channel.consume(queue_Name):
                # 阻塞式获取消息
                # 将body转为json
                logInfo("发现RabbiMQ中的消息!正在处理...")
                dataJson = json.loads(body.decode(encoding=utf-8).lower())
                tableName = str(dataJson[table])
                event = str(dataJson[event])
                # 1、获取pk是什么字段名称
                pk = GetPk(tableName)
                # 2、解析rabbitmq中的json数据,知道这个主键的对应真实值是什么?
                id = RabbitMqId(dataJson, pk)
                # 3、组装查询的sql
                sql = GetSql(tableName, id)
                # 4、处理这个表中ID的数据变更
                doAction(sql)
                # 成功处理
                logInfo(成功处理 + tableName +  + event + "事件一条!id=" + str(id))
                # 确认收到这条消息
                channel.basic_ack(method_frame.delivery_tag)
                break
            requeued_messages = channel.cancel()
            connection.close()
        except Exception as err:
            logInfo("发生了异常:"+str(err)+",将休息10秒...")
            time.sleep(10000)

 

pika的阻塞式使用

标签:update   开始   python   param   live   cal   sum   json   mysql数据库   

原文地址:https://www.cnblogs.com/littlehb/p/9188847.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!