标签:sql turn post 生成 coding try asc orm final
‘‘‘
调整下游
‘‘‘
import csv
import psycopg2
import json
class IO_rw(object):
def __init__(self):
self.conn = psycopg2.connect(database="postgres", user="postgres", password="123456", host="127.0.0.1", port="5432")
self.cur = self.conn.cursor()
self.read_down = []
self.datadict_down = {}
self.read_up = []
self.datadict_up = {}
self.file = open("you_num.json", "w", encoding="utf-8")
#
def process_item(self):
self.cur.execute("select id,name,pid from bjzs_big_data.baoji_industry_level where pid = 0")
rows = self.cur.fetchall()
#拿到所有的一级分类
dict = {}
for row in rows:
row = list(row)
dict[row[0]] = row[1]
li = list(dict.items())
return li
#下游
def sql_dowm(self,i):
self.cur.execute("select downid from bjzs_big_data.baoji_industry_chain where upid = {}".format(i))
rows = self.cur.fetchall()
downList = []
for row in rows:
downList.append(row[0])
return downList
#上游
def sql_up(self,i):
self.cur.execute("select upid from bjzs_big_data.baoji_industry_chain where downid = {}".format(i))
rows = self.cur.fetchall()
upList = []
for row in rows:
upList.append(row[0])
return upList
def getAllDowm(self, id, pid):
temp = ()
name = self.getNameByid(id)
if pid is None:
temp = (id, None, ‘root‘, name)
else:
temp = (id, pid, ‘down‘, name)
# temp[‘topic‘] = name
# print(temp)
#把当前的id放在 全局的read里面。用于防止重新查找
#如果递归调用过、 这里就直接pass
if temp in self.read_down:
pass
else:
#把每一个temp放到read_down里面
self.read_down.append(temp)
##sql查询查询下面的id
downList = self.sql_dowm(id)
# 递归调用
for down in downList:
self.getAllDowm(down, id)
def getAllUp(self, id, pid):
temp = ()
name = self.getNameByid(id)
if pid is None:
temp = (id, None, ‘root‘, name)
else:
temp = (id, pid, ‘up‘, name)
#把当前的id放在 全局的read里面。用于防止重新查找
#如果递归调用过、 这里就直接pass
if temp in self.read_up:
pass
else:
#把每一个temp放到read_down里面
self.read_up.append(temp)
##sql查询查询下面的id
upList = self.sql_up(id)
# 递归调用
for down in upList:
self.getAllUp(down, id)
def getNameByid(self,id):
self.cur.execute("select name from bjzs_big_data.baoji_industry_level where id = {}".format(id))
rows = self.cur.fetchall()
try:
if rows == None:
name = None
else:
name = rows[0][0]
return name
except:
pass
def write_json(self,data):
print(data)
content = json.dumps(dict(data), ensure_ascii=False) + "\n"
self.file.write(content)
def get_key(self,data):
list_finally = []
for t in data:
dict_one = {}
dict_one[‘id‘] = t[0]
dict_one[‘parentid‘] = t[1]
dict_one[‘type‘] = t[2]
dict_one[‘topic‘] = t[3]
list_finally.append(dict_one)
return list_finally
if __name__ == ‘__main__‘:
r = IO_rw()
li = r.process_item()
print(li)
for i in li[:5]:
print(‘=====================‘, i[0])
r.getAllDowm(i[0], None)
#print(r.read_down) # 三个参数分别这样显示、 1、代表行业。 2、代表他是谁的上下游、 3、上游还是下游、 4、他的名字
list_down = r.get_key(r.read_down)
print("下游有:",list_down)
r.getAllUp(i[0], None)
#print(r.read_up)
list_up = r.get_key(r.read_up)
print(" 上游有:", list_up)
r.read_down = []
r.read_up = []
r.cur.close()
r.conn.close()
r.file.close()
标签:sql turn post 生成 coding try asc orm final
原文地址:https://www.cnblogs.com/yuanjia8888/p/10175220.html