码迷,mamicode.com
首页 > 其他好文 > 详细

Elasticsearch+Mongo亿级别数据导入及查询实践

时间:2018-01-28 16:25:59      阅读:882      评论:0      收藏:0      [点我收藏+]

标签:tag   arch   turn   查询   raise   批量插入   图片   func   doc   

数据方案:
  • 在Elasticsearch中通过code及time字段查询对应doc的mongo_id字段获得mongodb中的主键_id
  • 通过获得id再进入mongodb进行查询
 
1,数据情况:
  • 全部为股票及指数的分钟K线数据(股票代码区分度较高)
  • Elasticsearch及mongodb都未分片且未优化参数配置
  • mongodb数据量:

    技术分享图片

  • Elasticsearch数据量:

    技术分享图片

2,将数据从mongo源库导入Elasticsearch

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch()

conn = MongoClient(127.0.0.1, 27017)
db = conn.kline_db
my_set = db.min_kline
x = 1
tmp = []

#此处有个坑mongo查询时由于数据量比较大时间较长需要设置游标不过期:no_cursor_timeout=True
for i in my_set.find(no_cursor_timeout=True):
    x+=1
    #每次插入100000条
    if x%100000 == 99999:
        #es批量插入
        success, _ = bulk(es, tmp, index=test_2, raise_on_error=True)
        print(Performed %d actions % success)
        tmp = []
    if i[market] == sz:
        market = 0
    else:
        market = 1
    #此处有个秒数时间类型及时区转换
    tmp.append({"_index":test_2,"_type": kline,_source:{code:i[code],market:market,                time:time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i[kline_time]/1000 - 8*60*60))                ,mongo_id:str(i[_id])}})

#将最后剩余在tmp中的数据插入
if len(tmp)>0:
    success, _ = bulk(es, tmp, index=test_2, raise_on_error=True)
    print(Performed %d actions % success)

3,Elasticsearch+mongo查询时间统计

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from bson.objectid import ObjectId

#es连接
es = Elasticsearch()

#mongo连接
conn = MongoClient(127.0.0.1, 27017)
db = conn.kline_db  #连接kline_db数据库,没有则自动创建
my_set = db.min_kline

tmp = []

#计算运行时间装饰器
def cal_run_time(func):
    def wrapper(*args,**kwargs):
        start_time = time.time()
        res = func(*args,**kwargs)
        end_time = time.time()
        print(str(func) +---run time--- %s % str(end_time-start_time))
        return res
    return wrapper

@cal_run_time
def query_in_mongo(tmp_list):
    k_list = []
    kline_data = my_set.find({_id:{$in:tmp_list}})
    for k in kline_data:
        k_list.append(k)
    return k_list

@cal_run_time
def query_in_es():
    #bool多条件查询 must相当于and
    body = {
        "query": {
            "bool": {
                "must": [{
                    "range": {#范围查询
                        "time": {
                            "gte": 2017-01-10 00:00:00,  # >=
                            "lte": 2017-04-12 00:00:00  # <=
                        }
                    }
                },
                    {"terms": {# == 或  in:terms 精确查询
                        "code": [000002,000001]
                    }
                    }
                ]
            }

        }
    }

    #根据body条件记性查询
    scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m")

    #解析结果字典并放入tmp列表中
    for resp in scanResp:
        tmp.append(ObjectId(resp[_source][mongo_id]))

    print(len(tmp))

    #--------------此处有个坑,直接使用search方法查询到的结果集中最多只有10条记录----------------
    # zz = es.search(index="test_2", doc_type="kline", body=body)
    # print(zz[‘hits‘][‘total‘])
    # for resp in zz[‘hits‘][‘hits‘]:
    #     tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘]))

query_in_es()

query_in_mongo(tmp)

运行结果如下:

第一行:查询的doc个数:28320

第二行:es查询所用时间:0.36s

第三行:mongo使用_id查询所用时间 :0.34s

技术分享图片

从结果来看对于3亿多数据的查询Elasticsearch的速度还是相当不错的

※Elasticsearch主要的优势在于可以进行分词模糊查询,所以股票K线并不是完全适应此场景。

※Elasticsearch+Mongo这个架构主要针对:使用mongo存储海量数据,且这张表更新频繁。

 

技术分享图片
技术分享图片
技术分享图片
技术分享图片
技术分享图片
技术分享图片
技术分享图片
技术分享图片
技术分享图片

Elasticsearch+Mongo亿级别数据导入及查询实践

标签:tag   arch   turn   查询   raise   批量插入   图片   func   doc   

原文地址:https://www.cnblogs.com/dxf813/p/8371214.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!