码迷,mamicode.com
首页 > 数据库 > 详细

Python 连接mongdb. 读取. 解析json数据至mysql

时间:2018-07-19 10:52:42      阅读:197      评论:0      收藏:0      [点我收藏+]

标签:outer   ase   for   解析json数据   ble   end   exe   continue   bigdata   

# -*- coding:utf-8 -*-

import sys
from pymongo import MongoClient
import pandas as pd
from sqlalchemy import create_engine
import MySQLdb
import json


reload(sys)
sys.setdefaultencoding(‘utf-8‘)

def find(database,table_name):
    
    # 创建连接
    client = MongoClient(host="127.0.0.1"
                         , port=3717
                         , username=‘dataro‘
                         , password="kZk671112"
                         , authSource=‘bigdata‘
                         , authMechanism=‘SCmAM-SHe-8‘
                         , replicaSet=‘mgset-5057112‘
                         )
    
    db = client[database]
    collection = db[table_name]

    # 获取数据
    processResult = collection.find()

    #定义字典,获取要解析的内容
    field_dict = {
        "id": []
        , "credit_id": []
        , "source_id": []
        , "req_url": []
        , "resp_time": []
        , "bank_card": []
        , "idCard": []
        , "mobile": []
        , "name": []
        , "result": []
        , "guid": []
        , "message": []
        , "status": []
        , "credit_risk_level": []
        , "data_flag_id": []
        , "data_flag_phone": []
    }

    # 循环解析
    for j in processResult:
        get_key = lambda x: j[x] if x in j.keys() else "-"
        if get_key("applicant_mujin") == "-":
            continue
        req_param_list = json.loads(get_key("applicant_mujin").get("req_param"))
        response_list = json.loads(get_key("applicant_mujin").get("response"))
        if "req_param" in get_key("applicant_mujin"):
            applicant_list = response_list["body"]["content"]["applicant"]
            if "applicant" in response_list["body"]["content"]:
                field_dict["id"].append(get_key("_id"))
                field_dict["credit_id"].append(get_key("credit_id"))
                field_dict["source_id"].append(get_key("source_id"))

                if get_key("applicant_mujin") == "-":
                    continue
                field_dict["req_url"].append(get_key("applicant_mujin").get("req_url"))
                field_dict["resp_time"].append(get_key("applicant_mujin").get("resp_time"))

                # req_param_list
                field_dict["bank_card"].append(req_param_list["bank_card"])
                field_dict["idCard"].append(req_param_list["idCard"])
                field_dict["mobile"].append(req_param_list["mobile"])
                field_dict["name"].append(req_param_list["name"])

                # response_list
                field_dict["guid"].append(response_list["guid"])
                field_dict["message"].append(response_list["message"])
                field_dict["status"].append(response_list["status"])
                field_dict["result"].append(response_list["body"]["result"])

                # applicant_list
                field_dict["credit_risk_level"].append(applicant_list["credit_risk_level"])
                field_dict["data_flag_id"].append(applicant_list["data_flag_id"])
                field_dict["data_flag_phone"].append(applicant_list["data_flag_phone"])

    dt = pd.DataFrame(data=field_dict)
    return dt

# 写入
def to_mysql(dataframe):
    connect = create_engine(‘mysql+mysqldb://root:hadoop@123@127.0.0.100:3306/data_outer?charset=utf8‘)
    pd.io.sql.to_sql(dataframe
                     , "applicant_mj"
                     , con=connect
                     , schema="data_outer"
                     , if_exists="append"
                     )
# 删除
def delete_data():
    db = MySQLdb.connect(host="127.0.0.100"
                         , user=‘root‘
                         , passwd=‘hadoop@123‘
                         , port=3306
                         , db=‘data_outer‘
                         , charset=‘utf8‘
                         )
    cursor = db.cursor()
    cursor.execute("delete from data_outer.applicant_mj;")
    cursor.close()
    db.commit()
    db.close()


if __name__ == ‘__main__‘:
    print "start"
    delete_data()
    print "delete_data end"
    dataframe = find("data_outer", "outer_data")
    to_mysql(dataframe)
    print "dataframe end"
    print "Finish!!!"

 

Python 连接mongdb. 读取. 解析json数据至mysql

标签:outer   ase   for   解析json数据   ble   end   exe   continue   bigdata   

原文地址:https://www.cnblogs.com/RHadoop-Hive/p/9334136.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!