标签:parse zip spring 逻辑 conf RoCE mes 方便 ESS
ELK架构日志处理逻辑:1、业务层Filebeat安装时会自动获取主机运行站点域名及环境信息新增channel及env标签,并将channel的值作为kafka的topic信息
2、Kafka收到Filebeat的新增字段及Topic信息,自动创建Topic信息,以等待logstash消费
3、Logstash根据脚本自动生成input及output配置
这里的topic一定和filebeat的channel一致。
示范:
filebeat层:
- type: log
processors:
- add_fields:
fields:
env: "prod" ## ansible调用Python根据网段信息自动判断生成
ip: "10.12.11.27" ## ansible调用Python根据网段信息自动判断生成
apptype: "service" ## ansible调用Python根据域名自动判断生成
channel: "cms.prod.tarscorp.com" ##ansible调用Python根据站点目录生成
enabled: true
paths:
- /data1/logs/cms.prod.tarscorp.com/*.log
output.kafka:
codec.json:
pretty: true
escape_html: false
hosts: ["kafka1.mgt.tarscorp.com:9092", "kafka2.mgt.tarscorp.com:9092", "kafka3.mgt.tarscorp.com:9092"]
topic: ‘cms.prod.tarscorp.com‘ ## topic和channel取自同一数据
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
Kafka层:忽略,集群+开启自动创建Topic即可
logstash层:
vim prod-input.conf ##输入信息
kafka {
topics => "cms.prod.tarscorp.com" ## kafka中的topic信息
bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092"
decorate_events => false
group_id => "logstash-tars"
# consumer_threads => 5
client_id => "mgt-elk-logstash1-prod"
codec => "json"
add_field => {"topic"=>"cms.prod.tarscorp.com"} ##这里主要是为了方便logstash做判断
}
vim prod-javasite.conf ##输出信息
if [topic] == "cms.prod.tarscorp.com" {
elasticsearch {
hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"]
manage_template => false
index => "prod-javasite-%{+YYYY.MM.dd}"
}
}
说明:我们这里技术栈为Java spring,所以所有的Java站点都会放在[prod-javasite-%{+YYYY.MM.dd}]的索引下,由于配置环节众多,所以在站点上架时,采用分发机器先部署服务,然后ansible部署filebeat,其中ansible会通过Python脚本来获取服务器网段及站点信息通过templates来补充channel、Topic、apptype、env、ip标签生成配置,实现自动判断,减轻运维参与环节负担。
使用脚本生成举例:
./add_info.py --env prod --topic cms.prod.tarscorp.com --module javasite
vim add_info.py
#!/usr/bin/env python3
import os,sys,argparse
parser = argparse.ArgumentParser(description=‘Logstash configuration file add tools‘)
parser.add_argument(‘--env‘,type=str,required=True,help=‘环境信息‘)
parser.add_argument(‘--topic‘,type=str,required=True,help=‘Topic信息‘)
parser.add_argument(‘--module‘,type=str,required=True,help=‘模块信息‘)
args = parser.parse_args()
env_info = args.env
topic_name = args.topic
module_info = args.module
date = "%{+YYYY.MM.dd}"
template_input = ‘‘‘
kafka {
topics => "%s"
bootstrap_servers => "kafka1.mgt.tarscorp.com:9092,kafka2.mgt.tarscorp.com:9092,kafka3.mgt.tarscorp.com:9092"
decorate_events => false
group_id => "logstash-tars"
# consumer_threads => 5
client_id => "mgt-elk-logstash1-dev"
codec => "json"
add_field => {"topic"=>"%s"}
}
}
‘‘‘ %(topic_name,topic_name)
template_output = ‘‘‘
if [topic] == "%s" {
elasticsearch {
hosts => ["mgt-elk-esmaster1:9200", "mgt-elk-esmaster2:9200", "mgt-elk-esmaster3:9200"]
manage_template => false
index => "%s-%s-%s"
}
}
}
‘‘‘ %(topic_name,env_info,module_info,date)
init_input = ‘‘‘
input {
‘‘‘
init_output = ‘‘‘
output {
‘‘‘
path_home = "/etc/logstash/conf.d/"
input_file = "/etc/logstash/conf.d/%s-input.conf" % (env_info)
output_file = "/etc/logstash/conf.d/%s-%s.conf" % (env_info, module_info)
if os.path.exists(path_home) == False:
print(‘请在logstash主机运行该脚本‘)
exit(code=255)
if os.path.exists(input_file) == False:
with open(input_file, mode=‘w‘, encoding=‘utf-8‘) as f:
f.write(init_input)
if os.path.exists(output_file) == False:
with open(output_file, mode=‘w‘, encoding=‘utf-8‘) as f:
f.write(init_output)
with open(input_file,mode=‘rb+‘) as f:
f.seek(-2,2)
f.write(template_input.encode(‘utf-8‘))
with open(output_file,mode=‘rb+‘) as f:
f.seek(-2,2)
f.write(template_output.encode(‘utf-8‘))
标签:parse zip spring 逻辑 conf RoCE mes 方便 ESS
原文地址:https://blog.51cto.com/swiki/2492425