码迷,mamicode.com
首页 > 编程语言 > 详细

python中使用pyspark 读取和整理日志数据并将数据写入到es中去

时间:2020-06-14 17:09:05      阅读:125      评论:0      收藏:0      [点我收藏+]

标签:apr   dex   import   href   art   group   整理   amp   data   

代码:

import re
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkContext
from elasticsearch import Elasticsearch
spark=SparkSession.builder.appName("lz").getOrCreate()
sc = SparkContext.getOrCreate()
es = Elasticsearch()
month_map = {Jan: 1, Feb: 2, Mar:3, Apr:4, May:5, Jun:6, Jul:7,
    Aug:8,  Sep: 9, Oct:10, Nov: 11, Dec: 12}

log_data = sc.textFile("/Desktop/data_doc/data_Log/sshlogin/03.txt") #使用spark读取本地日志文件


for b in log_data.toLocalIterator(): 
    #以迭代的方式来把一条条数据读取出来进行正则匹配,并最终将 dict作为body写入到es中去
    # e=‘Ambari:Mar  2 02:14:16 ambari sshd[16716]: Accepted password for root from 172.21.202.174 port 59886 ssh2‘#日志格式
    log_group=re.search(^(\S+):(\w{3})\s+(\d{1,2})\s(\d{2}:\d{2}:\d{2})\s(\S+)\s(\S+)\[(\d+)\]:\s(.+),b)
    if log_group:
        year=2019
        try:
            logtime = year+-+month_map[log_group.group(2)]+-+log_group.group(3)+ +log_group.group(4) #将字段拼接成年月日的格式
            logtime = datetime.datetime.strptime(logtime,%Y-%m-%d %H:%M:%S)
        except Exception as e:
           pass
        row = dict(_hostname=log_group.group(1), #将数据组成一个字典  k,v
                  syslog_timestamp=logtime,
                  hostname=log_group.group(5),
                  program=log_group.group(6),
                  pid=log_group.group(7),
                  msg = log_group.group(8))
        if re.match(^Accepted password for,row[msg]) or re.match(^Accepted publickey for,row[msg]) :

            msg_a=re.search(Accepted\s\w+\sfor\s(\S+)\sfrom\s(\d{2,3}\.\d{2,3}\.\d{2,3}\.\d{2,3})\sport\s(\d+),row[msg])
            row[login_success]=True
            row[login_success_msg]={username:msg_a.group(1),user_ip:msg_a.group(2),user_port:msg_a.group(3)}
        es.index(index=data_log02,doc_type=test02,body=row) #将数据写入到es中去
    else:
        break

 

 

转自:https://www.cnblogs.com/wangkun122/articles/10936938.html

python中使用pyspark 读取和整理日志数据并将数据写入到es中去

标签:apr   dex   import   href   art   group   整理   amp   data   

原文地址:https://www.cnblogs.com/tjp40922/p/13125182.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!