数据方案:
- 在Elasticsearch中通过code及time字段查询对应doc的mongo_id字段获得mongodb中的主键_id
- 通过获得id再进入mongodb进行查询
1,数据情况:
- 全部为股票及指数的分钟K线数据(股票代码区分度较高)
- Elasticsearch及mongodb都未分片且未优化参数配置
- mongodb数据量:
- Elasticsearch数据量:
2,将数据从mongo源库导入Elasticsearch
import time from pymongo import MongoClient from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk es = Elasticsearch() conn = MongoClient(‘127.0.0.1‘, 27017) db = conn.kline_db my_set = db.min_kline x = 1 tmp = [] #此处有个坑mongo查询时由于数据量比较大时间较长需要设置游标不过期:no_cursor_timeout=True for i in my_set.find(no_cursor_timeout=True): x+=1 #每次插入100000条 if x%100000 == 99999: #es批量插入 success, _ = bulk(es, tmp, index=‘test_2‘, raise_on_error=True) print(‘Performed %d actions‘ % success) tmp = [] if i[‘market‘] == ‘sz‘: market = 0 else: market = 1 #此处有个秒数时间类型及时区转换 tmp.append({"_index":‘test_2‘,"_type": ‘kline‘,‘_source‘:{‘code‘:i[‘code‘],‘market‘:market, ‘time‘:time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i[‘kline_time‘]/1000 - 8*60*60)) ,‘mongo_id‘:str(i[‘_id‘])}}) #将最后剩余在tmp中的数据插入 if len(tmp)>0: success, _ = bulk(es, tmp, index=‘test_2‘, raise_on_error=True) print(‘Performed %d actions‘ % success)
3,Elasticsearch+mongo查询时间统计
import time from pymongo import MongoClient from elasticsearch import Elasticsearch from elasticsearch.helpers import scan from bson.objectid import ObjectId #es连接 es = Elasticsearch() #mongo连接 conn = MongoClient(‘127.0.0.1‘, 27017) db = conn.kline_db #连接kline_db数据库,没有则自动创建 my_set = db.min_kline tmp = [] #计算运行时间装饰器 def cal_run_time(func): def wrapper(*args,**kwargs): start_time = time.time() res = func(*args,**kwargs) end_time = time.time() print(str(func) +‘---run time--- %s‘ % str(end_time-start_time)) return res return wrapper @cal_run_time def query_in_mongo(tmp_list): k_list = [] kline_data = my_set.find({‘_id‘:{‘$in‘:tmp_list}}) for k in kline_data: k_list.append(k) return k_list @cal_run_time def query_in_es(): #bool多条件查询 must相当于and body = { "query": { "bool": { "must": [{ "range": {#范围查询 "time": { "gte": ‘2017-01-10 00:00:00‘, # >= "lte": ‘2017-04-12 00:00:00‘ # <= } } }, {"terms": {# == 或 in:terms 精确查询 "code": [‘000002‘,‘000001‘] } } ] } } } #根据body条件记性查询 scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m") #解析结果字典并放入tmp列表中 for resp in scanResp: tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘])) print(len(tmp)) #--------------此处有个坑,直接使用search方法查询到的结果集中最多只有10条记录---------------- # zz = es.search(index="test_2", doc_type="kline", body=body) # print(zz[‘hits‘][‘total‘]) # for resp in zz[‘hits‘][‘hits‘]: # tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘])) query_in_es() query_in_mongo(tmp)
运行结果如下:
第一行:查询的doc个数:28320
第二行:es查询所用时间:0.36s
第三行:mongo使用_id查询所用时间 :0.34s
从结果来看对于3亿多数据的查询Elasticsearch的速度还是相当不错的
※Elasticsearch主要的优势在于可以进行分词模糊查询,所以股票K线并不是完全适应此场景。
※Elasticsearch+Mongo这个架构主要针对:使用mongo存储海量数据,且这张表更新频繁。