码迷,mamicode.com
首页 > 其他好文 > 详细

ELK简单安装测试

时间:2019-04-09 20:55:27      阅读:221      评论:0      收藏:0      [点我收藏+]

标签:sans   mep   生成   inf   gis   writable   方法   fine   use   

1 介绍组件

Filebeat是一个日志文件托运工具,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读)。

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

ElasticSearch它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch 中的 Index 是一组具有相似特征的文档集合,类似于关系数据库模型中的数据库实例,Index 中可以指定 Type 区分不同的文档,类似于数据库实例中的关系表,Document 是存储的基本单位,都是 JSON 格式,类似于关系表中行级对象。我们处理后的 JSON 文档格式的日志都要在 Elasticsearch 中做索引,相应的 Logstash 有 Elasticsearch output 插件,对于用户是透明的。

Hadoop 生态圈为大规模数据集的处理提供多种分析功能,但实时搜索一直是 Hadoop 的软肋。如今,Elasticsearch for Apache Hadoop(ES-Hadoop)弥补了这一缺陷,为用户整合了 Hadoop 的大数据分析能力以及 Elasticsearch 的实时搜索能力。

Logstash 是一种功能强大的信息采集工具,类似于 Hadoop 生态圈里的 Flume。通常在其配置文件规定 Logstash 如何处理各种类型的事件流,一般包含 input、filter、output 三个部分。Logstash 为各个部分提供相应的插件,因而有 input、filter、output 三类插件完成各种处理和转换;另外 codec 类的插件可以放在 input 和 output 部分通过简单编码来简化处理过程。

Kibana是ElasticSearch的用户界面。

在实际应用场景下,为了满足大数据实时检索的场景,利用Filebeat去监控日志文件,将Kafka作为Filebeat的输出端,Kafka实时接收到Filebeat后以Logstash作为输出端输出,到Logstash的数据也许还不是我们想要的格式化或者特定业务的数据,这时可以通过Logstash的一些过了插件对数据进行过滤最后达到想要的数据格式以ElasticSearch作为输出端输出,数据到ElasticSearch就可以进行丰富的分布式检索了。

2 下载安装包

下载 elasticsearch、logstash、kibana、filebeat 的压缩包,并将四个压缩包上传到 /data/elk 目录下

技术图片

官方文档:
Filebeat:
https://www.elastic.co/cn/products/beats/filebeat
Logstash:
https://www.elastic.co/cn/products/logstash
Kibana:
https://www.elastic.co/cn/products/kibana
Elasticsearch:
https://www.elastic.co/cn/products/elasticsearch
elasticsearch中文社区:
https://elasticsearch.cn/

3 安装ELK

3.1 安装jdk1.8

3.2 安装ElasticSearch

3.2.1 安装配置es

说明:ElasticSearch的运行不能用root执行,自己用useradd命令新建一个用户如下所示:
添加普通用户elk并设置密码
useradd elk
passwd elk 然后根据提示输入密码即可

修改文件所有者
chown -R elk:elk /data/elk

解压
tar -zxvf elasticsearch-6.7.0.tar.gz

修改 es 的配置文件
cd /data/elk/elasticsearch-6.7.0/config
vim elasticsearch.yml
修改方法参考如下:

cluster.name: my-application
node.name: node-1
node.attr.rack: r1
path.data: /data/elk/elasticsearch-6.7.0/data
path.logs: /data/elk/elasticsearch-6.7.0/logs
network.host: localhost
http.port: 9200

3.2.2 启动

su elk
./bin/elasticsearch -d

启动时间有点慢,耐心等待10-20s或者更长,查看9200,9300端口是否开启

netstat -tnlp|grep 9[23]00

tcp6       0      0 localhost:9200       :::*                    LISTEN      30155/java          
tcp6       0      0 localhost:9300       :::*                    LISTEN      30155/java

访问地址:http://localhost:9200

技术图片

停止服务:

ps -ef |grep elasticsearch 
kill PID

3.3 安装filebeat

3.3.1 安装配置

filebeat:部署在具体的业务机器上,通过定时监控的方式获取增量的日志,并转发到logstash、elasticsearch、kafka等等。

解压
tar –zxvf filebeat-6.7.0-linux-x86_64.tar.gz
cd filebeat-6.7.0-linux-x86_64
修改kibana 的配置文件
vim filebeat.yml

===================Filebeat prospectors ===========

filebeat.prospectors:

#
 Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log

  #
 Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /data/elk/logfile
#- c:\programdata\elasticsearch\logs\*

#
输出到logstash
#-------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

3.3.2 启动

./filebeat -e -c filebeat.yml -d "publish"

3.4 安装logstash

3.4.1 安装配置

解压
tar -zxvf logstash-6.7.0.tar.gz
cd logstash-6.7.0
配置
在logstash文件夹的下bin目录创建配置文件logstash.conf ,内容如下:
input:对应输入的配置,其中path是监控的文**件路劲, codec编码个格式
output:elasticsearch : { hosts => "localhost:9200" }
对应输出到elasticsearch,hosts是elasticsearch安装地址

input {
    beats {
        host => "localhost"
        port => 5044 
        codec => json
    }
}

output {

    stdout {
        codec => rubydebug
    }

    elasticsearch {
        hosts => "localhost:9200"
        index="test_index"
    }
}

3.4.2 启动

测试你的配置文件 是否正确( 解析配置文件并报告任何错误。)
./logstash -f logstash.conf --config.test_and_exit

Sending Logstash logs to /data/elk/logstash-6.7.0/logs which is now configured via log4j2.properties
[2019-03-28T17:55:34,114][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/data/elk/logstash-6.7.0/data/queue"}
[2019-03-28T17:55:34,169][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/data/elk/logstash-6.7.0/data/dead_letter_queue"}
[2019-03-28T17:55:34,856][WARN ][logstash.config.source.multilocal] Ignoring the ‘pipelines.yml‘ file because modules or command line options are specified
Configuration OK
[2019-03-28T17:55:43,327][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

启动命令(启用自动配置重新加载,这样就不必每次修改配置文件时都停止并重新启动Logstash )
./logstash -f logstash.conf --config.reload.automatic

停止服务
ps aux | grep logstash
kill -9 pid

3.5 安装kibana

3.5.1 安装配置

解压
tar –zxvf kibana-6.7.0-linux-x86_64.tar.gz
cd kibana-6.7.0-linux-x86_64/config
配置
vim kibana.yml

server.port: 5601
server.host: "localhost"

#
elasticsearch.username: "elastic"
#elasticsearch.password: "changeme"

elasticsearch.hosts: ["http://localhost:9200"]
kibana.index: ".kibana"

3.5.2 启动

./bin/kibana

访问地址:
http://localhost:5601

技术图片

netstat -anltp|grep 5601查找对应端口
kill -9 pid

4 测试

4.1 监测指定文件,Kibana展示

4.1.1 Logstash监控文件

我们通过echo往这文件/data/elk/test.log追加内容,来测试整个日志收集系统是否可行
vim test2.conf

input {
  file {
    type =>"syslog"
     path => ["/data/elk/test.log" ]
  }
  syslog {
    type =>"syslog"
    port =>"5544"
  }
}
output {
  stdout { codec=> rubydebug }
  elasticsearch {hosts => "http:// localhost:9200"}
}

启动
./logstash -f test2.conf --config.reload.automatic

向监控文件中写入数据

echo "{ "firstName": "1", "lastName":"McLaughlin", "email": "aaaa" }" >> test.log 

Logstash打印接收到的数据:

{
    "message" => "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
    "@timestamp" => 2019-03-29T02:39:35.229Z,
      "@version" => "1",
          "host" => "node223",
          "path" => "/data/elk/test.log"
}

4.1.2 es 查看接收数据

查看es所有index
curl -X GET ‘http://localhost:9200/_cat/indices?v‘

health status index                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_task_manager ZYN7zZl3QLuB1d-EE_ysHA   1   0          2            0     12.5kb         12.5kb
yellow open   logstash-2019.03.29  rfkkxW7rQDKnl40_Ekaf7g   5   1          2            0     10.8kb         10.8kb
yellow open   test-index           hFBQF-jZSkqajPRYzegfwg   5   1          0            0      1.2kb          1.2kb
green  open   .kibana_1            sEUUnVMxQ0amHbQGcOedOQ   1   0          4            1     19.8kb         19.8kb

查看单个index数据
curl ‘localhost:9200/logstash-2019.03.29/_search?pretty=true‘

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash-2019.03.29",
        "_type" : "doc",
        "_id" : "W85Rx2kBAxlZ_OvPIlZf",
        "_score" : 1.0,
        "_source" : {
          "message" : "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
          "@timestamp" : "2019-03-29T02:39:35.229Z",
          "@version" : "1",
          "host" : "node223",
          "path" : "/data/elk/test.log"
        }
      }
    ]
  }
}

4.1.3 用ES连接到Kibana

在你开始用Kibana之前,你需要告诉Kibana你想探索哪个Elasticsearch索引。第一次访问Kibana是,系统会提示你定义一个索引模式以匹配一个或多个索引的名字。

1、访问Kibana UI。例如,localhost:56011 或者 http://YOURDOMAIN.com:5601
2、指定一个索引模式来匹配一个或多个你的Elasticsearch索引。当你指定了你的索引模式以后,任何匹配到的索引都将被展示出来。
  (画外音:*匹配0个或多个字符; 指定索引默认是为了匹配索引,确切的说是匹配索引名字)
3、点击“Next Step”以选择你想要用来执行基于时间比较的包含timestamp字段的索引。如果你的索引没有基于时间的数据,那么选择“I don’t want to use the Time Filter”选项。
4、点击“Create index pattern”按钮来添加索引模式。第一个索引模式自动配置为默认的索引默认,以后当你有多个索引模式的时候,你就可以选择将哪一个设为默认。(提示:Management > Index Patterns)

技术图片

技术图片

现在,Kibana已经连接到你的Elasticsearch数据。Kibana展示了一个只读的字段列表,这些字段是匹配到的这个索引配置的字段。

4.1.4 Kibana展示

http://localhost:5601

4.2 监控Spring Boot服务日志

要求日志格式为json格式

4.2.1 json日志格式配置

在Spring Boot工程resources目录下配置logback.xml文件,旨在将log4j日志按json格式输出到指定目录,logback.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
   <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径 
   <property name="LOG_HOME" value="E:/demo/home" />
   -->

   <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
   <!--<property name="LOG_PATTERN" value="[  %d{yyyy-MM-dd HH:mm:ss.SSS}] [UUID:%X{requestId}] [%level]|[${HOSTNAME}] [%thread]|[%logger{36}] | %msg|%n " />-->
   <property name="TIME_PATTERN" value="yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘" />
   <property name="LOG_PATTERN" value=‘{"event":"log","timestamp":"%d{${TIME_PATTERN}}","level":"%level","serviceName":"${springAppName:-}","pid": "${PID:-}","host":"${HOSTNAME}","class": "%logger{40}","thread":"%thread","message":"%msg"}%n ‘ />

   <!-- Console 输出设置 -->
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
      <encoder>
         <pattern>${LOG_PATTERN}</pattern>
         <charset>utf8</charset>
      </encoder>
   </appender>

   <!-- 按照每天生成日志文件 -->
   <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
      <file>logs/newsinfo-dataservice.log</file>
      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
         <FileNamePattern>logs/newsinfo-dataservice.%d{yyyy-MM-dd}.log</FileNamePattern>
      </rollingPolicy>
      <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
         <pattern>${LOG_PATTERN}</pattern>
      </encoder>
   </appender>

   <!--log4jdbc -->
   <logger name="com.sohu" level="debug" additivity="false">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILE" />    
    </logger>

   <!-- 日志输出级别 -->
   <root level="info">
      <appender-ref ref="STDOUT" />
      <appender-ref ref="FILE" />
   </root>
</configuration>

输出日志示例如下:

{"event":"log","timestamp":"2019-04-09T19:47:07.908Z","level":"INFO","serviceName":"springAppName_IS_UNDEFINED","pid": "7196","host":"node1","class": "o.s.b.web.servlet.FilterRegistrationBean","thread":"localhost-startStop-1","message":"Mapping filter: ‘MyFilter‘ to urls: [/getUser, /hello]"}

4.2.2 DomeOS启动Spring Boot服务

启动服务的同事,配置日志收集到kafka,然后调用ELK读取kafka日志存入ES,进行日志的全文检索和实时告警分析。

技术图片

创建新版本时,如上配置下日志收集模块,日志就自动通过Flume接入Kafka了。

路径:/code/logs/newsinfo-dataservice.log
自动收集 :自动将日志收集到 Kafka
Kafka Topic:newsinfo_appdata_service_log

ELK简单安装测试

标签:sans   mep   生成   inf   gis   writable   方法   fine   use   

原文地址:https://www.cnblogs.com/xiaodf/p/10679336.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!