标签:elk elasticsearch logstash 日志分析
上图为http://www.cnblogs.com/delgyd/p/elk.html#3656833 中的图
架构解读 : (整个架构从左到右,总共分为5层)(本文将第三层以下的进行了合并,无elasticsearch集群)
第一层、数据采集层
最左边的是业务服务器集群,上面安装了filebeat做日志采集,同时把采集的日志分别发送给两个logstash服务。
第二层、数据处理层,数据缓存层
logstash服务把接受到的日志经过格式处理,转存到本地的kafka broker+zookeeper 集群中。
第三层、数据转发层
这个单独的Logstash节点会实时去kafka broker集群拉数据,转发至ES DataNode。
第四层、数据持久化存储
ES DataNode 会把收到的数据,写磁盘,建索引库。
第五层、数据检索,数据展示
ES Master + Kibana 主要 协调 ES集群,处理数据检索请求,数据展示。
filebeat-5.5.2-1.x86_64
#vim filebeat.yml
filebeat.modules:
filebeat.prospectors:
- input_type: log
paths: #定义读取log的路径,此处为每个项目一个路径,可以写多个或者用* 匹配
- /usr/local/nginx1.6/logs/sso.so.duia.com.log
include_lines: [ ]
multiline: #合并多行,下一行不是[ 开头,合并到上一行
pattern: '^\['
negate: true
match: after
document_type: sso-so #定义type,提供给logstash 引用,并最终定义elasticsearch 索引
tail_files: true
output.kafka: #输出到kafka中
enabled: true
hosts: ["172.16.101.76:9092"]
topic: nginx #定义消费队列,如果多个logstash消费,需要定义Parttion
compression: Snappy
max_message_bytes: 1000000
nohup /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/logs.yml -d "publish" &>> /data/logs/filebeat.log &
zookeeper-3.4.9.tar.gz
#vim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper #定义数据存放位置
clientPort=2181
server.1=172.16.101.76:12888:13888
server.2=172.16.101.175:12888:13888
server.3=172.16.101.172:12888:13888
cat /data/zookeeper/myid
1
Zookeeper其他节点请参考此配置文件,只有myid不同。
/usr/local/elk/zookeeper/bin/zkServer.sh start
kafka_2.12-0.10.2.0.tgz
# vim server.properties
broker.id=1
port = 9092
host.name = 172.16.101.76 #监控地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/logs/kafka #log文件存放位置
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181 #连接zookeeper地址
zookeeper.connection.timeout.ms=6000
bin/kafka-server-start.sh config/server.properties &
logstash-5.2.2.tar.gz
input {
kafka {
bootstrap_servers => "172.16.101.76:9092"
topics => ["nginx"]
codec => "json"
decorate_events => true
}
}
input {
kafka {
bootstrap_servers => "172.16.101.76:9092"
topics => ["tomcat"]
codec => "json"
decorate_events => true
}
}
filter {
#nginx
if [type] == "nginx-access.log” {
grok {
match => {
"message" => "\[%{HTTPDATE:timestamp}\] %{IPV4:client_ip} \"%{USER:forward}\" %{USER:user} %{IPORHOST:host} \"%{WORD:method} %{URIPATHPARAM:valume} %{URIPROTO:http}/%{NUMBER:http_version}\" %{QS:request_body} %{NUMBER:status:int} \"(?:%{IPORHOST:urlname} %{POSINT:urlport})\" %{NUMBER:request_time} %{IPV4:upstream_host}:%{NUMBER:upstream_port} %{NUMBER:reponse_time} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}"
}
remove_field => ["message"]
}
geoip {
source => "client_ip"
target => "geoip"
database => "/data/GeoIP/GeoLite2-City.mmdb"
add_field => ["location" , "%{[geoip][latitude]}, %{[geoip][longitude]}"]
}
date {
match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z"]
target => "@timestamp"
remove_field => ["timestamp"]
}
}
if [type] == "catalina.out" {
grok {
match => {
"message" => "%{COMMONAPACHELOG}"
}
remove_field => ["message"]
}
}
}
output {
if "_grokparsefilure" in [tags] {
file {
path => "data/logs/grokparsefailure-%{[type]}-%{+YYYY.MM}.log"
}
}
elasticsearch {
hosts => ["172.16.101.76:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
template_overwrite => true
}
}
/usr/local/elk/logstash/bin/logstash -f /usr/local/elk/logstash/config/logs.yml &
elasticsearch-5.2.2.tar.gz
[root@host76 config]# grep -vE "^$|^#" elasticsearch.yml
cluster.name: Mo
node.name: node01
node.attr.rack: r1
path.data: /data/elasticsearch
path.logs: /data/logs/elasticsearch
bootstrap.memory_lock: false
network.host: 172.16.101.76
http.port: 9200
discovery.zen.ping.unicast.hosts: ["172.16.101.76","172.16.101.172"]
discovery.zen.minimum_master_nodes: 1
gateway.recover_after_nodes: 1
action.destructive_requires_name: true
bootstrap.system_call_filter: false
thread_pool.index.queue_size: 500
thread_pool.bulk.queue_size: 1000
indices.recovery.max_bytes_per_sec: 100mb
http.cors.enabled: true
http.cors.allow-origin: "*"
[root@host76 config]# grep -vE "^$|^#" jvm.options
-Xms6g
-Xmx6g
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:+DisableExplicitGC
-XX:+AlwaysPreTouch
-server
-Xss1m
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Djna.nosys=true
-Djdk.io.permissionsUseCanonicalPath=true
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true
-Dlog4j.skipJansi=true
-XX:+HeapDumpOnOutOfMemoryError
配置文件修改参考官方文档
https://elasticsearch.cn/book/elasticsearch_definitive_guide_2.x/dont-touch-these-settings.html
bin/elasticsearch -d
kibana-5.2.2-linux-x86_64.tar.gz
[root@host76 config]# grep -vE "^$|^#" kibana.yml
server.port: 5601
server.host: "172.16.101.76"
elasticsearch.url: "http://172.16.101.76:9200"
elasticsearch.pingTimeout: 1500
elasticsearch.requestTimeout: 30000
elasticsearch.requestHeadersWhitelist: [ authorization ]
pid.file: /usr/local/kibana/kibana.pid
logging.dest: /data/logs/kibana/kibana.log
bin/kibana &
upstream kibana {
server 172.16.101.76:5601 max_fails=3 fail_timeout=30s;
}
server {
listen 8080;
server_name localhost;
location / {
proxy_pass http://kibana/;
index index.html index.htm;
#auth
#auth_basic "kibana Private";
#auth_basic_user_file /etc/nginx/.htpasswd;
}
}
Logstash为读取kafka中的数据,并且将数据通过grok中的正则进行格式化,输出到elasticsearch中。
遇到的问题:
1、grok未生效,logstash调试
output {
stdout {
codec => rubydebug
}
}
使用debug模式,输出的内容不是grok格式好的json串,经过摸索,input 中定义
Codec => “json” 输出后为grok格式化的json格式。
2、同一种log_format定义的log信息不匹配
Grok 不需要特别的匹配到字符串格式,同时,不需要的信息,可以不进行匹配。
3、grok调试
http://grokdebug.herokuapp.com/?#
此网站在线调试,开始时需使用代理
4、logstash中output 可以动态定义索引,同时也可以指定固定索引
5、logstash 可以使用if 判断 定义input 中的数据源 和 output中的 输出及index
Elasticsearch
查看节点
curl '172.16.101.76:9200/_cat/nodes?v'
查看健康状况
curl '172.16.101.76:9200/_cat/health?v'
清理缓存
curl http://127.0.0.1:9200/logstash-*/_cache/clear
查看索引
curl -s 'http://172.16.101.76:9200/_cat/indices?v'
查看elasticsearch线程情况
curl -XGET http://xxxx:9200/_nodes/stats/thread_pool?pretty
清理索引
curl -XDELETE 'http://172.16.101.76:9200/*'
批量清理指定日期的索引
#curl -s 'http://172.16.101.76:9200/_cat/indices?v' | sort | awk '{print $3}' > del_index.txt
#for i in `grep 2017.12.22 del_index.txt` ;do curl -XDELETE "http://172.16.101.76:9200/${i}" && sleep 10 ;done
查看elasticsearch 所有模版
curl -XGET localhost:9200/_template | python -m json.tool
查看索引的mapping
curl -XGET http://127.0.0.1:9200/*/_mapping/
删除elasticsearch 索引模版
curl -XDELETE localhost:9200/_template/*
添加自定义模版
curl -XPUT localhost:9200/_template/nginx -d@template.json
Template.json
{
"aliases": {},
"mappings": {
"_default_": {
"_all": {
"enabled": true,
"norms": false
},
"dynamic_templates": [
{
"message_field": {
"mapping": {
"norms": false,
"type": "text"
},
"match_mapping_type": "string",
"path_match": "message"
}
},
{
"string_fields": {
"mapping": {
"fields": {
"keyword": {
"type": "keyword"
}
},
"norms": false,
"type": "text"
},
"match": "*",
"match_mapping_type": "string"
}
}
],
"properties": {
"@timestamp": {
"include_in_all": false,
"type": "date"
},
"@version": {
"include_in_all": false,
"type": "keyword"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"request_body": {
"ignore_above": 32766,
"index": "no",
"type": "keyword"
}
}
}
},
"order": 0,
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"template": "nginx-*",
"version": 50001
}
#因为抽取了nginx 日志,在地图上不显示地区,发现elasticsearch中的模版, geoip总location字段类型不为 geo_point, 没有使用默认模版,通过修改默认模版名称,符合nginx的索引规则,然后清理了索引及历史记录,同时重建索引。地图上成功显示成功。
参考链接:
http://www.cnblogs.com/delgyd/p/elk.html#3656833
https://elasticsearch.cn/book/elasticsearch_definitive_guide_2.x/dont-touch-these-settings.html
http://blog.csdn.net/zhaoyangjian724/article/details/52337402
标签:elk elasticsearch logstash 日志分析
原文地址:http://blog.51cto.com/phospherus/2086965