标签:outer ase for 解析json数据 ble end exe continue bigdata
# -*- coding:utf-8 -*- import sys from pymongo import MongoClient import pandas as pd from sqlalchemy import create_engine import MySQLdb import json reload(sys) sys.setdefaultencoding(‘utf-8‘) def find(database,table_name): # 创建连接 client = MongoClient(host="127.0.0.1" , port=3717 , username=‘dataro‘ , password="kZk671112" , authSource=‘bigdata‘ , authMechanism=‘SCmAM-SHe-8‘ , replicaSet=‘mgset-5057112‘ ) db = client[database] collection = db[table_name] # 获取数据 processResult = collection.find() #定义字典,获取要解析的内容 field_dict = { "id": [] , "credit_id": [] , "source_id": [] , "req_url": [] , "resp_time": [] , "bank_card": [] , "idCard": [] , "mobile": [] , "name": [] , "result": [] , "guid": [] , "message": [] , "status": [] , "credit_risk_level": [] , "data_flag_id": [] , "data_flag_phone": [] } # 循环解析 for j in processResult: get_key = lambda x: j[x] if x in j.keys() else "-" if get_key("applicant_mujin") == "-": continue req_param_list = json.loads(get_key("applicant_mujin").get("req_param")) response_list = json.loads(get_key("applicant_mujin").get("response")) if "req_param" in get_key("applicant_mujin"): applicant_list = response_list["body"]["content"]["applicant"] if "applicant" in response_list["body"]["content"]: field_dict["id"].append(get_key("_id")) field_dict["credit_id"].append(get_key("credit_id")) field_dict["source_id"].append(get_key("source_id")) if get_key("applicant_mujin") == "-": continue field_dict["req_url"].append(get_key("applicant_mujin").get("req_url")) field_dict["resp_time"].append(get_key("applicant_mujin").get("resp_time")) # req_param_list field_dict["bank_card"].append(req_param_list["bank_card"]) field_dict["idCard"].append(req_param_list["idCard"]) field_dict["mobile"].append(req_param_list["mobile"]) field_dict["name"].append(req_param_list["name"]) # response_list field_dict["guid"].append(response_list["guid"]) field_dict["message"].append(response_list["message"]) field_dict["status"].append(response_list["status"]) field_dict["result"].append(response_list["body"]["result"]) # applicant_list field_dict["credit_risk_level"].append(applicant_list["credit_risk_level"]) field_dict["data_flag_id"].append(applicant_list["data_flag_id"]) field_dict["data_flag_phone"].append(applicant_list["data_flag_phone"]) dt = pd.DataFrame(data=field_dict) return dt # 写入 def to_mysql(dataframe): connect = create_engine(‘mysql+mysqldb://root:hadoop@123@127.0.0.100:3306/data_outer?charset=utf8‘) pd.io.sql.to_sql(dataframe , "applicant_mj" , con=connect , schema="data_outer" , if_exists="append" ) # 删除 def delete_data(): db = MySQLdb.connect(host="127.0.0.100" , user=‘root‘ , passwd=‘hadoop@123‘ , port=3306 , db=‘data_outer‘ , charset=‘utf8‘ ) cursor = db.cursor() cursor.execute("delete from data_outer.applicant_mj;") cursor.close() db.commit() db.close() if __name__ == ‘__main__‘: print "start" delete_data() print "delete_data end" dataframe = find("data_outer", "outer_data") to_mysql(dataframe) print "dataframe end" print "Finish!!!"
Python 连接mongdb. 读取. 解析json数据至mysql
标签:outer ase for 解析json数据 ble end exe continue bigdata
原文地址:https://www.cnblogs.com/RHadoop-Hive/p/9334136.html