标签:ted cursor sig where port amp overview url find
脱敏后脚本
projectapi.py: 项目接口类
# -*- coding:utf-8 -*- """ xx项目接口类 2018-11 dinghanhua """ import requests import re import pymssql #region 工具类函数 def findinfo_from_apiquerystockdetailinfo(str1,str2): """ 从str1中找第一个"str2":...后面的值 :param str1: :param str2: :return: str2对应的值 """ pattern1 = ‘"‘+str2 + ‘":(.*?),"‘ #左右边界 result = re.search(pattern1, str1) #正则匹配 if result: result = result.group(1).replace(‘"‘,‘‘) return result def get_last_value_of_key(resultlist,key): ‘‘‘ 从二维数组取第一行的元素对应的最后一行的值 :param resultlist: :param key: :return: value ‘‘‘ for i in range(0,len(resultlist[0])): if key == resultlist[0][i]: #第一行中找到对应字段名的索引 result = resultlist[-1][i] return result #返回数组最后一行对应的值 def round_test(data,i=0): ‘‘‘ 四舍五入,解决round(7.35)=7.3的问题 :param data: :param i: 保留的位数,默认保留一位小数 :return: ‘‘‘ if isinstance(i,int): #i是整数 raise Exception(‘the second param must be int‘) else: mi = 10**i f = data*mi - int(data*mi) if f >=0.5: res = (int(data*mi)+1)/mi elif f <=-0.5: res = (int(data*mi-1))/mi else: res = int(data*mi)/mi if i<=0: res = int(res) return res # endregion class ProjectApi: def api_querystockdetailinfo(self,stockcode): """ 请求并提取股票基本信息接口数据 :param stockcode: :return: 截取信息dict """ api = ‘http://testdomain/querystockdetailinfo?stockcode={stockcode}‘.format(stockcode = stockcode) response = requests.get(api) result = response.text.replace(r‘\n‘,‘‘).replace(‘\\‘, ‘‘) # 去掉特殊字符\n,\ result_dict = {‘stockcode‘: stockcode} #股票名称 result_dict[‘StockName‘] = findinfo_from_apiquerystockdetailinfo(result, ‘StockName‘) if result_dict[‘StockName‘]: #股票名称存在继续处理其他字段,否则报错并返回 # 公司概要 #剔除公司概要中“公司”“公司是”、“公司是一家”高度重复的内容 overviewvalue = result_dict[‘OverviewValue‘] = findinfo_from_apiquerystockdetailinfo(result, ‘OverviewValue‘) if overviewvalue.startswith(‘公司是一家‘): result_dict[‘OverviewValue‘] = overviewvalue[5:] elif overviewvalue.startswith(‘公司是‘): result_dict[‘OverviewValue‘] = overviewvalue[3:] elif overviewvalue.startswith(‘公司‘): result_dict[‘OverviewValue‘] = overviewvalue[2:] if not overviewvalue.endswith(‘。‘): #判断最后是否有句号,没有加一个 result_dict[‘OverviewValue‘] += ‘。‘ # 市值 typecap = findinfo_from_apiquerystockdetailinfo(result, ‘TypeCap‘) dictcap = {‘1‘: ‘巨盘‘, ‘2‘: ‘大盘‘, ‘3‘: ‘中盘‘, ‘4‘: ‘小盘‘, ‘5‘: ‘微盘‘} result_dict[‘TypeCap‘] = dictcap[typecap] # 风格 TypeStyle = result_dict[‘TypeStyle‘] = findinfo_from_apiquerystockdetailinfo(result, ‘TypeStyle‘) dictstyle = {‘1‘: ‘成长‘, ‘2‘: ‘价值‘, ‘3‘: ‘周期‘, ‘4‘: ‘题材‘, ‘5‘: ‘高价值‘} if len(TypeStyle) == 1: result_dict[‘TypeStyle‘] = dictstyle[TypeStyle] elif len(TypeStyle) >1: typestylelist = TypeStyle.split(‘,‘) #风格可能有多个 for t in range(len(typestylelist)): typestylelist[t] = dictstyle[typestylelist[t]] result_dict[‘TypeStyle‘] = ‘、‘.join(typestylelist) # 生命周期 LifecycleValue 生命周期(单选,例:1);(1初创期、2成长期、3成熟期、4衰退期)") LifecycleValue = findinfo_from_apiquerystockdetailinfo(result, ‘LifecycleValue‘) dictlifecycle = {‘1‘: ‘初创期‘, ‘2‘: ‘成长期‘, ‘3‘: ‘成熟期‘, ‘4‘: ‘衰退期‘, ‘5‘: ‘周期底部‘, ‘6‘: ‘周期顶部‘, ‘7‘: ‘周期向下‘, ‘8‘: ‘周期向上‘} if LifecycleValue: result_dict[‘LifecycleValue‘] = dictlifecycle[LifecycleValue] # 估值 ScoreTTM 估值(分值1~5,>=4 偏低, <=2 偏高,其他适中)") ScoreTTM = findinfo_from_apiquerystockdetailinfo(result, ‘ScoreTTM‘) if ScoreTTM: if float(ScoreTTM) >= 4: result_dict[‘ScoreTTM‘] = ‘偏低‘ elif ScoreTTM and float(ScoreTTM) <= 2: result_dict[‘ScoreTTM‘] = ‘偏高‘ else: result_dict[‘ScoreTTM‘] = ‘适中‘ # 成长指数 ScoreGrowing 成长指数(分值1~5,>=4 高, <=2 低,其他中)‘) ScoreGrowing = findinfo_from_apiquerystockdetailinfo(result, ‘ScoreGrowing‘) if ScoreGrowing: if float(ScoreGrowing) >= 4: result_dict[‘ScoreGrowing‘] = ‘高‘ elif float(ScoreGrowing) <= 2: result_dict[‘ScoreGrowing‘] = ‘低‘ else: result_dict[‘ScoreGrowing‘] = ‘中‘ else: result_dict[‘ScoreGrowing‘]=‘‘ # 盈利能力 ScoreProfit = findinfo_from_apiquerystockdetailinfo(result, ‘ScoreProfit‘) # ‘ ScoreProfit 盈利能力(分值1~5,>=4 高, <=2 低,其他中)‘ ) if ScoreProfit: if float(ScoreProfit) >= 4: result_dict[‘ScoreProfit‘] = ‘高‘ elif float(ScoreProfit) <= 2: result_dict[‘ScoreProfit‘] = ‘低‘ else: result_dict[‘ScoreProfit‘] = ‘中‘ else: result_dict[‘ScoreProfit‘]=‘‘ return result_dict def api_finance(self,stockcode): """ 请求并提取财务数据 :param stockcode: :return: dict """ api = ‘http://testdomain/finance?stockcode={stockcode}‘.format(stockcode = stockcode) response = requests.get(api) response.encoding = ‘utf-8-sig‘ result = response.json()[‘data‘][0][‘result‘] # 转化为二位数组 result_dict = {‘stockcode‘: stockcode} #存储返回结果 if len(result) <3: #说明股票没数据 return result_dict # 当期报告期 result_dict[‘EndDate‘] = get_last_value_of_key(result, ‘EndDate‘) # 预测收益报告期 ReportPeriod = get_last_value_of_key(result, ‘ReportPeriod‘) dictreportperoid = {‘03/31‘: ‘一季度‘, ‘06/30‘: ‘上半年‘, ‘09/30‘: ‘前三季度‘, ‘12/31‘: ‘本年度‘} if ReportPeriod and ReportPeriod != result_dict[‘EndDate‘]: #预测收益报告期不等于当期报告期 ReportPeriod = get_last_value_of_key(result, ‘ReportPeriod‘)[5:10] result_dict[‘ReportPeriod‘] = dictreportperoid[ReportPeriod] else: result_dict[‘ReportPeriod‘] = ‘‘ # 预测业绩情况 PerformanceType = get_last_value_of_key(result, ‘PerformanceType‘) result_dict[‘PerformanceType‘] = PerformanceType # 预测业绩比例 result_dict[‘PerformanceTypeRange‘] = get_last_value_of_key(result, ‘PerformanceTypeRange‘) # 营业收入增长率 result_dict[‘OperatingRevenueYoY‘] = get_last_value_of_key(result, ‘OperatingRevenueYoY‘) # 营业利润增长率 result_dict[‘NetProfitYoY‘] = get_last_value_of_key(result, ‘NetProfitYoY‘) # 净资产收益率 result_dict[‘ROE‘] = get_last_value_of_key(result, ‘ROE‘) # 毛利率 result_dict[‘SalesGrossMargin‘] = get_last_value_of_key(result, ‘SalesGrossMargin‘) return result_dict def api_quote(self,stockcode): """ 请求并提取PETTM :param stockcode: :return: dict """ api = ‘http://testdomain/quote?stockcode={stockcode}‘.format(stockcode=stockcode) response = requests.get(api) response.encoding = ‘utf-8-sig‘ result = response.json()[‘data‘][0][‘result‘] # 转化为二位数组 result_dict = {‘stockcode‘:stockcode} if len(result) <3: #说明股票没数据 return result_dict result_dict[‘PETTM‘] = get_last_value_of_key(result, ‘PE1‘) return result_dict def result_of_3sourceapi(self,stockcode): """ 拼接3个接口取出的字串 :param stockcode: :return: """ result_stockdetailinfo = self.api_querystockdetailinfo(stockcode) if result_stockdetailinfo[‘StockName‘]: #如果股票名称存在,执行后续步骤 result_finance = self.api_finance(stockcode) result_quote = self.api_quote(stockcode) #显示三个接口结果 #print(result_stockdetailinfo) #print(result_finance) #print(result_quote) #空值、精度处理 OperatingRevenueYoY = round_test(float(result_finance[‘OperatingRevenueYoY‘]),1) NetProfitYoY = round_test(float(result_finance[‘NetProfitYoY‘]),1) if result_finance[‘ReportPeriod‘] ==‘‘ or result_finance[‘PerformanceType‘] == ‘‘ or result_finance[‘PerformanceTypeRange‘] == ‘‘: ReportPeriod = PerformanceType = PerformanceTypeRange = ‘‘ else: ReportPeriod = ‘,预计‘ + result_finance[‘ReportPeriod‘] PerformanceType = ‘业绩‘ + result_finance[‘PerformanceType‘] PerformanceTypeRange = result_finance[‘PerformanceTypeRange‘] if result_finance[‘ROE‘]: ROE = ‘,净资产收益率:{0}%‘.format(round_test(float(result_finance[‘ROE‘]),1)) else: ROE = ‘‘ if result_finance[‘SalesGrossMargin‘]: SalesGrossMargin = ‘,毛利率:{0}%‘.format(round_test(float(result_finance[‘SalesGrossMargin‘]),1)) else: SalesGrossMargin = ‘‘ result = ‘{OverviewValue} {TypeCap}{TypeStyle}股,处于{LifecycleValue}。‘ ‘估值{ScoreTTM},PE(TTM):{PETTM};‘ ‘成长性{ScoreGrowing},当期营收增长:{OperatingRevenueYoY}%,当期利润增长:{NetProfitYoY}%;‘ ‘盈利能力{ScoreProfit}{ROE}{SalesGrossMargin}{ReportPeriod}{PerformanceType}{PerformanceTypeRange}。‘ .format(OverviewValue = result_stockdetailinfo[‘OverviewValue‘], TypeCap = result_stockdetailinfo[‘TypeCap‘], TypeStyle = result_stockdetailinfo[‘TypeStyle‘], LifecycleValue = result_stockdetailinfo[‘LifecycleValue‘], ScoreTTM = result_stockdetailinfo[‘ScoreTTM‘], PETTM = round_test(float(result_quote[‘PETTM‘])), ScoreGrowing = result_stockdetailinfo[‘ScoreGrowing‘], OperatingRevenueYoY = OperatingRevenueYoY, NetProfitYoY = NetProfitYoY, ScoreProfit = result_stockdetailinfo[‘ScoreProfit‘], ROE = ROE, SalesGrossMargin=SalesGrossMargin, ReportPeriod = ReportPeriod, PerformanceType = PerformanceType, PerformanceTypeRange = PerformanceTypeRange) return result else: return ‘不存在该股票数据‘ def api_of_dev(self,stockcodelist,cookie,page=0,domain=‘testdomain.cn‘): """ 获取开发接口数据 :param 股票列表;cookie;domain默认线上 :return: 股票代码及数据 """ headers = {‘Cookie‘:cookie} url = ‘http://{domain}/getstockbypage?StockCodeList={stockcodelist}&PageIndex={page}&PageSize=10‘.format(stockcodelist = stockcodelist,domain = domain,page=page) response = requests.get(url, headers=headers) jsonstr = response.json()# 转成json,取出message message = jsonstr[‘Message‘] dict_result = {} if message: for ele in message: stockcode = ele[‘StockCode‘] content = ele[‘Content‘] # 实际结果 nickname = ele[‘NickName‘] #发布者 if nickname == ‘project000‘: dict_result[stockcode] = content return dict_result def compare_vs_devapi(self,stockcodelist,cookie,page=0,domain=‘testdomain.cn‘): """ 开发接口数据与接口拼接数据对比 :return: bool """ diff_list = [] # 存储不一致的股票代码 resultofdev = self.api_of_dev(stockcodelist,cookie,page,domain) #请求开发接口 if resultofdev: #如果开发结果不为空 for stock,actual in resultofdev.items(): expected = self.result_of_3sourceapi(stock) #数据源拼接结果 ‘‘‘去掉pe(ttm)对比‘‘‘ beginindex = actual.find(‘PE(TTM)‘) endindex = actual.find(‘;‘, beginindex) actual_result = actual[:beginindex] + actual[endindex:] beginindex = expected.find(‘PE(TTM)‘) endindex = expected.find(‘;‘, beginindex) expected_result = expected[:beginindex] + expected[endindex:] if actual_result != expected_result: #预期实际对比 print(stock) print(‘开发:‘,actual_result) print(‘预期:‘,expected_result) diff_list.append(stock) else: print(stock,‘一致(不含PETTM)‘) if diff_list: #异常股票列表不为空则输出;空则提示全部一致 print(‘不一致的股票列表:‘, diff_list) return False else: print(‘对比结果:数据一致‘) return True else: print(‘接口没有数据‘) return True def compare_vs_database(self,count=10): """ 比较数据库数据与数据源拼接字串 :param count:对比股票数量,default=10 :return:True 一致;False 不一致 """ diff_list = [] # 存储不一致的股票代码 with pymssql.connect(server=‘192.168.1.1‘, user=‘sa‘, password=‘sa‘, database=‘test_db‘) as myconnect: with myconnect.cursor(as_dict=True) as cursor: cursor.execute("""SELECT top {count} StockCode,StockName,content FROM [test_db].[dbo].[table] where NickName =‘project000‘ and isvalid = 1 and IsDeleted =0 order by createtime desc""".format(count=count)) for row in cursor: stockcode = row[‘StockCode‘] actual = row[‘content‘] expected = self.result_of_3sourceapi(stockcode) # 数据源拼接结果 ‘‘‘去掉pe(ttm)对比‘‘‘ beginindex = actual.find(‘PE(TTM)‘) endindex = actual.find(‘;‘, beginindex) actual_result = actual[:beginindex] + actual[endindex:] beginindex = expected.find(‘PE(TTM)‘) endindex = expected.find(‘;‘, beginindex) expected_result = expected[:beginindex] + expected[endindex:] if actual_result != expected_result: # 预期实际对比 print(stockcode) print(‘开发:‘, actual_result) print(‘预期:‘, expected_result) diff_list.append(stockcode) else: print(stockcode, ‘一致(不含PETTM)‘) if diff_list: print(‘不一致的股票列表:‘, diff_list) return False else: print(‘对比结果:数据全部一致‘) return True
run_test.py 执行脚本:
# coding:utf-8 """ 接口测试执行脚本 """ import projectapi import unittest class ApiTest(unittest.TestCase): def setUp(self): self.projectapi1 = projectapi.ProjectApi() # 创建接口对象 def testcase1(self): """与开发接口比对""" stockcodelist = ‘600000%2C600128%2C600146%2C600165%2C600186‘ #通过抓包获取cookie cookie = ‘globalid=24A85DEC-AF25-36DD-C774-ED092F705767‘ testresult = self.projectapi1.compare_vs_devapi(stockcodelist,cookie) self.assertTrue(testresult) def testcase2(self): # 与数据库对比 testresult = self.projectapi1.compare_vs_database(count=10) self.assertTrue(testresult) def testcase3(self): """手工查询原数据和拼接字串""" while True: stockcode = input(‘输入股票代码: ‘) if stockcode.isdigit() and len(stockcode)==6 and stockcode[:2] in (‘00‘,‘60‘,‘30‘): print(‘数据请求中.....‘) print(self.projectapi1.api_quote(stockcode)) print(self.projectapi1.api_finance(stockcode)) print(self.projectapi1.api_querystockdetailinfo(stockcode)) print(self.projectapi1.result_of_3sourceapi(stockcode)) else: print(‘股票代码有误‘)
标签:ted cursor sig where port amp overview url find
原文地址:https://www.cnblogs.com/dinghanhua/p/10376705.html