码迷,mamicode.com
首页 > 其他好文 > 详细

pyspark读取elasticsearch

时间:2020-07-17 22:12:04      阅读:101      评论:0      收藏:0      [点我收藏+]

标签:sea   nod   dex   query   代码   ret   pre   turn   instance   

代码:

import json
from pyspark.sql import SparkSession
from pyspark import SparkConf


def trans_form(data_tuple):
    """
    对从es读取出来的每一条数据进行格式转换
    :param data_tuple:
    :return:
    """
    data = data_tuple[1]
    return data


def get_es_conf(es_hot, es_port, index, type_, query_dic):
    query = {"query": {"match_all": {}}}
    if isinstance(query_dic, dict):
        query = json.dumps(query_dic)
    else:
        query = json.dumps(query)

    es_read_conf = {
        "es.nodes": es_hot,
        "es.port": es_port,  # 必须是字符串类型
        "es.resource": {}/{}.format(index, type_),
        "es.out.json": "yes",
        "es.query": query
    }
    return es_read_conf


def read_data_from_es(sc, es_hot, es_port, index, type_, query_dic):
    es_read_conf = get_es_conf(es_hot, es_port, index, type_, query_dic)
    es_rdd = sc.newAPIHadoopRDD(
        inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_read_conf
    )
    return es_rdd


if __name__ == __main__:
    conf = SparkConf()
    spark = SparkSession().builder.config(conf).appName(test).getOrCreate()
    sc = spark.SparkContext

    es_host = 127.0.0.1
    es_port = 9200
    index = test
    type_name = result
    query = {"query": {"match_all": {}}}
    es_rdd = read_data_from_es(sc, es_host, es_port, index, type_name, query)

    # 读取出来的是_id和数据组成的元组,转换格式之后过滤空值就是我们要的数据
    hdd = es_rdd.map(lambda x: trans_form(x)).filter(lambda x: x)

 

pyspark读取elasticsearch

标签:sea   nod   dex   query   代码   ret   pre   turn   instance   

原文地址:https://www.cnblogs.com/tjp40922/p/13332679.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!