import cx_Oracle as orcl class ConnectOracle: #scott xx 1521 oracle #sqlplus, system as sysdba, select instance_name from V$instance; #oracle://qmcbrt:qmcbrt@localhost:1521/tqmcbdb #cx_Oracle.connection(‘hr‘, ‘hrpwd‘, ‘localhost:1521/XE‘) ##cx_Oracle.connection(‘qmcbrt:qmcbrt@localhost:1521/tqmcbdb‘) def __init__(self, username, passwd, host, port=‘1521‘, sid=‘oracle‘): # dsn = data source name self.login = {} self.db = None self.cursor = None self.login[‘username‘] = username self.login[‘passwd‘] = passwd self.login[‘dsn‘] = orcl.makedsn(host, port, sid) print(orcl.clientversion()) def connect_oracle(self): try: #orcl.connect(‘load/123456@localhost/ora11g‘) #orcl.connect(‘username‘,‘password‘,‘host/orcl‘) self.db = orcl.connect(self.login[‘username‘], self.login[‘passwd‘], self.login[‘dsn‘]) # 登录内搜数据库 self.db.autocommit = False # 关闭自动提交 #orcl.Cursor(self.db) self.cursor = self.db.cursor() # 设置cursor光标 #print(self.db.dsn) #print(self.db.password) #print(self.db.ping()) #print(self.db.tnsentry) #print(self.db.username); #print(self.db.version) #print(self.cursor.rowcount) return True except: print(‘can not connect oracle‘) return False def close_oracle(self): self.cursor.close() self.db.close() def select_oracle(self, sql, size=0, params=None): if self.connect_oracle(): if params: # 执行多条sql命令 # results = cur.executemany("select field from table where field = %s", (‘value1‘, ‘value2‘, ‘value3‘)) #sql = "SELECT ENAME, trunc(SAL,0) FROM EMP # WHERE deptno = :deptno and sal > :value" #cursor.execute(sql, deptno = 30, value = 1000); #params = {‘deptno‘:30, ‘sal‘:3000} #cur.execute(sql, params) self.cursor.executemany(sql, params) else: self.cursor.execute(sql) if size: content = self.cursor.fetchmany(size) else: content = self.cursor.fetchall() # 字段名 #ss = sql.lower() #find默认查找从左向右第一个位置 #string = sql[ss.find(‘select‘), ss.find(‘from‘)] columns = [i[0] for i in self.cursor.description] self.close_oracle() #if len(content)==1: # return pd.Series(content[0], index=columns) return pd.DataFrame(content, columns=columns) return False def insert_oracle(self, sql, data=None): try: self.connect_oracle() if data: # 批量插入, [(),..]、((),..) # mysql自带load data infile ‘D:/../...txt‘ into table user(var1, ., .) #self.cursor.execute(‘insert into demo(v) values(:1)‘,[‘nice‘]) self.cursor.executemany(sql, data) else: self.cursor.execute(sql, data) except: print("insert异常") self.db.rollback() # 回滚 finally: self.db.commit() # 事务提交 self.close_oracle() # 批量查询数据缺失率 #sql_generator(): def missing_count(table_name, where_condition={}, **engine): #where 条件参数化, str或dict sql_tb_name = "select column_name from user_tab_columns where table_name = ‘{}‘".format(table_name) db = ConnectOracle(**engine) #sql_select.encode(‘utf-8‘) columns = db.select_oracle(sql=sql_tb_name) #生成select语句 ss = ‘‘ for col in columns.COLUMN_NAME: ss += ‘sum(decode({},null, 1, 0)) as {}, ‘.format(col, col) ss = ss[:-2] #生成where条件 wh = ‘‘ if where_condition: wh += ‘ where ‘ if type(where_condition)==str: wh += where_condition if type(where_condition)==dict: for key in where_condition.keys(): if type(where_condition[key])!=str: wh += (‘t.‘ + str(key) + ‘ = ‘ + str(where_condition[key]) + ‘ and ‘) else: wh += ("t." + str(key) + " = ‘" + str(where_condition[key]) + "‘ and ") wh = wh[:-4] #print(ss) sql_select = ‘‘‘select count(*) as counts, {} from {} t {} ‘‘‘.format(ss, table_name, wh) #print(sql_select) res = db.select_oracle(sql=sql_select) return pd.Series(res.values.tolist()[0], index=res.columns) import numpy as np import pandas as pd from collections import Counter config = { ‘username‘:‘qmcbrt‘, ‘passwd‘:‘qmcbrt‘, ‘host‘:‘localhost‘, ‘port‘:‘1521‘, ‘sid‘:‘tqmcbdb‘ } db = ConnectOracle(**config) #sql_select.encode(‘utf-8‘) db.select_oracle(sql=sql_select) #colnames = [‘aac147‘, ‘aac058‘, ‘aac004‘, ‘aac005‘, ‘aac161‘, ‘aac006‘, ‘aac011‘, # ‘aac017‘, ‘aca111‘, ‘aac009‘, ‘aac154‘, ‘aab301‘, ‘bab004‘, ‘aac300‘, # ‘aac301‘, ‘bac020‘, ‘bac015‘, ‘aac204‘, ‘aac060‘, ‘bae185_zd‘, # ‘baz060_zd‘, ‘aac012‘, ‘aac031‘, ‘aac049‘, ‘aae030‘, ‘aae031‘, ‘aae116‘, # ‘aae201‘, ‘aae200‘, ‘aic161‘, ‘aic162‘, ‘is_zd‘, ‘is_dy‘, ‘label‘] #qmcb_km_100 = pd.DataFrame(np.array(qmcb_km_100), columns=colnames) # #qmcb_km_100.to_csv(r‘e:/qmcb/qmcb_km_100.csv‘, index=False, encoding=‘utf-8‘) where_condition = { ‘is_normal‘: 1, ‘is_below_16‘: 0, ‘is_xs‘: 0, ‘is_cj‘: 0, ‘is_dead‘: 0, ‘AAE138_is_not_null‘: 0, ‘is_dc‘: 0, ‘is_px‘: 0 } # 计算 QMCB_KM_2019_1_31_1 表的数据缺失数 missing_count(‘QMCB_KM_2019_1_31_1‘, where_condition, **config)
python 连接 oracle 统计指定表格所有字段的缺失值数
