方 案
- Filebeat->Logstash->Files
- Filebeat->Redis->Logstash->Files
- Nxlog(Rsyslog、Logstash)->Kafka->Flink(Logstash->ES-Kibana)
- 其他方案(可根据自己需求,选择合适的架构,作者选择了第二种方案)
注释: 由于Logstash无法处理输出到文件乱序的问题,可通过不同的文件使用不同的Logstash;或者直接写入ES(不存在乱序问题)、通过Flink输出到文件
部 署
系统环境
- Debian8 x64
- logstash-6.1.1
- filebeat-6.1.1-amd64
- Redis-3.2
Filebeat配置
/etc/filebeat/filebeat.yml filebeat.prospectors: - type: log paths: - /home/data/log/* - /home/data/*.log scan_frequency: 20s encoding: utf-8 tail_files: true harvester_buffer_size: 5485760 fields: ip_address: 192.168.2.2 env: qa output.redis: hosts: ["192.168.1.1:6379"] password: "geekwolf" key: "filebeat" db: 0 timeout: 5 max_retires: 3 worker: 2 bulk_max_size: 4096
Logstash配置
input { #Filebeat # beats { # port => 5044 # } #Redis redis { batch_count => 4096 data_type => "list" key => "filebeat" host => "127.0.0.1" port => 5044 password => "geekwolf" db => 0 threads => 2 } } filter { ruby { code => ‘event.set("filename",event.get("source").split("/")[-1])‘ } } output { if [filename] =~ "nohup" { file { path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/%{filename}" flush_interval => 3 codec => line { format => "%{message}"} } } else { file { path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/logs/%{filename}" flush_interval => 3 codec => line { format => "%{message}"} } } #stdout { codec => rubydebug } }
生产日志目录
├── prod │ └── 2018-01-13 │ └── 2.2.2.2 │ ├── logs │ │ ├── rpg_slow_db_.27075 │ └── nohup_service.log └── qa ├── 2018-01-12 │ ├── 192.168.3.1 └── 2018-01-13 ├── 192.168.3.2