码迷,mamicode.com
首页 > 编程语言 > 详细

ELK-Python(三)

时间:2016-06-20 23:56:03      阅读:349      评论:0      收藏:0      [点我收藏+]

标签:

 不具有通用性,留作纪念。

[root@GXB-CTRLCENTER python]# cat insert_uv.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from datetime import *
from with_conn_to_db import conn_to_mysql
import urllib2,json
import time

###define yestoday 0-24 hours delta part##########
today = date.today()
yestoday = today - timedelta(days=1)
#today = today - timedelta(days=1)
#print today,yestoday
#import sys
#sys.exit()
a = str(yestoday) +   + 00:00:00
b = str(today) +   + 00:00:00
#print a,b
timeArray1 = time.strptime(a, "%Y-%m-%d %H:%M:%S")
timeArray2 = time.strptime(b, "%Y-%m-%d %H:%M:%S")
start_time = int(time.mktime(timeArray1)) * 1000
end_time = int(time.mktime(timeArray2)) * 1000

#####define es index and search part########
server = http://elk.xkops.com:9200/
#stat_index = ‘client-visit-*‘
index=client-*
#start_time = 1459146210879
#stop_time = 1459147110879
url = server + index + "/_search?pretty=true"

query_date={
  "query": {
    "filtered": {
      "query": {
        "query_string": {
          "analyze_wildcard": True,
          "query": "*"
        }
      },
      "filter": {
        "bool": {
          "must": [
            {
              "range": {
                "@timestamp": {
                  "gte": start_time,
                  "lte": end_time,
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
  },
  "size": 0,
  "aggs": {
    "4": {
      "terms": {
        "field": "visit_tenant_id",
        "size": 1000000,
        "order": {
          "1": "desc"
        }
      },
      "aggs": {
        "1": {
          "sum": {
            "field": "count"
          }
        },
        "3": {
          "terms": {
            "field": "client_type",
            "size": 2,
            "order": {
              "1": "desc"
            }
          },
          "aggs": {
            "1": {
              "sum": {
                "field": "count"
              }
            },
            "2": {
              "terms": {
                "field": "sessionId",
                "size": 1000000,
                "order": {
                  "1": "desc"
                }
              },
              "aggs": {
                "1": {
                  "sum": {
                    "field": "count"
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}


query_date = json.dumps(query_date)
req = urllib2.Request(url,query_date)
response = urllib2.urlopen(req)
page = response.read()
#print page
result = json.loads(page)

###避免当天多次插入,插入前先删除#######
sql = "delete from uv_stat where create_time = ‘%s‘" % (yestoday)
#print sql
with conn_to_mysql(logstash) as db:
    db.execute(sql)

for s in result[aggregations][4][buckets]:
    tenant_id =  s[key]
    type1 = s[3][buckets][0][key]
    for a in   s[3][buckets][0][2][buckets]:
        session_id = a[key]
        #print tenant_id,type1,session_id
        sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values(‘%s‘,‘%s‘,‘%s‘,‘%s‘)" %(tenant_id,yestoday,session_id,type1)
        #print sql
        with conn_to_mysql(logstash) as db:
            db.execute(sql)
    if len(s[3][buckets]) > 1:
        type2 = s[3][buckets][1][key]
        for a in s[3][buckets][1][2][buckets]:
            session_id = a[key]
            #print tenant_id,type2,session_id
            sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values(‘%s‘,‘%s‘,‘%s‘,‘%s‘)" %(tenant_id,yestoday,session_id,type2)
            #print sql
            with conn_to_mysql(logstash) as db:
                db.execute(sql)
    else:
        continue

 

ELK-Python(三)

标签:

原文地址:http://www.cnblogs.com/xkops/p/5602086.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!