标签:bin eth header return mile bubuko windows 直接 tar
只需要在自定义的JsonLayout中构造一个PatternLayout帮助format日志消息即可 日志格式内容如下
public class JsonLogBean {
private String system;
private String ip;
private String message;
private String level;
private String time;
//get set 略
public JsonLogBean(){}
public JsonLogBean(String system, String ip,String message, String level, String time) {
this.system = system;
this.ip = ip;
this.message = message;
this.level = level;
this.time = time;
}
}
public class JsonPartternLayout extends PatternLayout{
private String system;
//PatternLayout 默认将异常交给WriterAppender处理 这里改为false
public boolean ignoresThrowable() {
return false;
}
private static String ip;
private static SimpleDateFormat utcFormater;
static {
utcFormater = new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘");
utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//时区定义并进行时间获取
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
//ignore
}
}
@Override
public String format(LoggingEvent event) {
StringBuilder sb = new StringBuilder();
sb.append(super.format(event));
String[] s = event.getThrowableStrRep();
if (s != null) {
int len = s.length;
for (int i = 0; i < len; i++) {
sb.append(s[i]);
sb.append("\n");
}
}
String time = utcFormater.format(new Date(event.getTimeStamp()));
return JSON.toJSONString(new JsonLogBean(system, ip, sb.toString(), event.getLevel().toString(), time)) + "\n";
}
public String getSystem() {
return system;
}
public void setSystem(String system) {
this.system = system;
}
}
@Plugin(name = "JsonPartternLayout", category = Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class JsonPartternLayout extends AbstractStringLayout {
private PatternLayout patternLayout;
private String system;
private static String ip;
private static SimpleDateFormat utcFormater;
static {
utcFormater = new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘");
utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//时区定义并进行时间获取
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
//ignore
}
}
public JsonPartternLayout(final Configuration config, final RegexReplacement replace, final String eventPattern,
final PatternSelector patternSelector, final Charset charset, final boolean alwaysWriteExceptions,
final boolean disableAnsi, final boolean noConsoleNoAnsi, final String headerPattern,
final String footerPattern, final String system) {
super(config, charset,
newSerializerBuilder()
.setConfiguration(config)
.setReplace(replace)
.setPatternSelector(patternSelector)
.setAlwaysWriteExceptions(alwaysWriteExceptions)
.setDisableAnsi(disableAnsi)
.setNoConsoleNoAnsi(noConsoleNoAnsi)
.setPattern(headerPattern)
.build(),
newSerializerBuilder()
.setConfiguration(config)
.setReplace(replace)
.setPatternSelector(patternSelector)
.setAlwaysWriteExceptions(alwaysWriteExceptions)
.setDisableAnsi(disableAnsi)
.setNoConsoleNoAnsi(noConsoleNoAnsi)
.setPattern(footerPattern)
.build());
this.patternLayout = PatternLayout.newBuilder()
.withPattern(eventPattern)
.withPatternSelector(patternSelector)
.withConfiguration(config)
.withRegexReplacement(replace)
.withCharset(charset)
.withDisableAnsi(disableAnsi)
.withAlwaysWriteExceptions(alwaysWriteExceptions)
.withNoConsoleNoAnsi(noConsoleNoAnsi)
.withHeader(headerPattern)
.withFooter(footerPattern)
.build();
this.system = system;
}
/**
*
* @param event
* @return
*/
public String toSerializable(LogEvent event) {
String msg = this.patternLayout.toSerializable(event);
String time = utcFormater.format(new Date(event.getTimeMillis()));
return JSON.toJSONString(new JsonLogBean(system, ip,msg, event.getLevel().name(), time)) + "\n";
}
@PluginBuilderFactory
public static Builder newBuilder() {
return new Builder();
}
/**
*
*/
public static class Builder implements org.apache.logging.log4j.core.util.Builder<JsonPartternLayout> {
@PluginBuilderAttribute
private String pattern = DEFAULT_CONVERSION_PATTERN;
@PluginElement("PatternSelector")
private PatternSelector patternSelector;
@PluginConfiguration
private Configuration configuration;
@PluginElement("Replace")
private RegexReplacement regexReplacement;
// LOG4J2-783 use platform default by default
@PluginBuilderAttribute
private Charset charset = Charset.defaultCharset();
@PluginBuilderAttribute
private boolean alwaysWriteExceptions = true;
@PluginBuilderAttribute
private boolean disableAnsi = !useAnsiEscapeCodes();
@PluginBuilderAttribute
private boolean noConsoleNoAnsi;
@PluginBuilderAttribute
private String header;
@PluginBuilderAttribute
private String footer;
@PluginBuilderAttribute
private String system;
private Builder() {
}
private boolean useAnsiEscapeCodes() {
PropertiesUtil propertiesUtil = PropertiesUtil.getProperties();
boolean isPlatformSupportsAnsi = !propertiesUtil.isOsWindows();
boolean isJansiRequested = !propertiesUtil.getBooleanProperty("log4j.skipJansi", true);
return isPlatformSupportsAnsi || isJansiRequested;
}
public JsonPartternLayout build() {
// fall back to DefaultConfiguration
if (configuration == null) {
configuration = new DefaultConfiguration();
}
return new JsonPartternLayout(configuration, regexReplacement, pattern, patternSelector, charset,
alwaysWriteExceptions, disableAnsi, noConsoleNoAnsi, header, footer, system);
}
}
}
这里我们直接在flume-ng-elasticsearch-sink的源码上做修改,为避免冲突我改了相关的包名 源码见这里 pom配置如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-high-eslog-sink</artifactId>
<version>1.8</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.5.4</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.5.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
</project>
flume-ng-elasticsearch-sink的源码还是比较少的,以下几个比较重要的改动点
将elasticsearch lib下的jar包copy到flume lib下 以下是我copy到flume的相关jar包 如果有jar缺失或冲突查看下flume的日志很容易就能够解决
-rw-r--r--@ 1 zhanghuan staff 10M 1 4 22:26 elasticsearch-6.5.4.jar
-rw-r--r--@ 1 zhanghuan staff 16K 1 4 22:26 elasticsearch-cli-6.5.4.jar
-rw-r--r--@ 1 zhanghuan staff 36K 1 4 22:26 elasticsearch-core-6.5.4.jar
-rw-r--r--@ 1 zhanghuan staff 12K 1 4 22:26 elasticsearch-launchers-6.5.4.jar
-rw-r--r--@ 1 zhanghuan staff 11K 1 4 22:26 elasticsearch-secure-sm-6.5.4.jar
-rw-r--r--@ 1 zhanghuan staff 110K 1 4 22:26 elasticsearch-x-content-6.5.4.jar
-rw-r--r--@ 1 zhanghuan staff 1.6M 1 4 22:37 lucene-analyzers-common-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 98K 1 4 22:37 lucene-backward-codecs-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 2.9M 1 4 22:37 lucene-core-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 85K 1 4 22:37 lucene-grouping-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 202K 1 4 22:37 lucene-highlighter-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 143K 1 4 22:37 lucene-join-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 50K 1 4 22:37 lucene-memory-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 93K 1 4 22:37 lucene-misc-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 259K 1 4 22:37 lucene-queries-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 373K 1 4 22:37 lucene-queryparser-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 259K 1 4 22:37 lucene-sandbox-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 14K 1 4 22:37 lucene-spatial-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 231K 1 4 22:37 lucene-spatial-extras-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 295K 1 4 22:37 lucene-spatial3d-7.5.0.jar
-rw-r--r--@ 1 zhanghuan staff 240K 1 4 22:37 lucene-suggest-7.5.0.jar
-rw-r--r-- 1 zhanghuan staff 7.4K 1 7 23:04 transport-6.5.4.jar
-rw-r--r-- 1 zhanghuan staff 77K 1 7 23:08 transport-netty4-client-6.5.4.jar
-rw-r--r-- 1 zhanghuan staff 107K 1 7 23:10 reindex-client-6.5.4.jar
-rw-r--r-- 1 zhanghuan staff 72K 1 7 23:11 percolator-client-6.5.4.jar
-rw-r--r-- 1 zhanghuan staff 60K 1 7 23:12 lang-mustache-client-6.5.4.jar
-rw-r--r-- 1 zhanghuan staff 75K 1 7 23:13 parent-join-client-6.5.4.jar
-rw-r--r-- 1 zhanghuan staff 1.1M 1 7 23:16 hppc-0.7.1.jar
-rw-r--r-- 1 zhanghuan staff 258K 1 7 23:18 log4j-api-2.11.1.jar
-rw-r--r-- 1 zhanghuan staff 1.5M 1 7 23:19 log4j-core-2.11.1.jar
-rw-r--r-- 1 zhanghuan staff 50K 1 8 19:29 t-digest-3.2.jar
-rw-r--r--@ 1 zhanghuan staff 3.6M 1 8 19:38 netty-all-4.1.25.Final.jar
-rw-r--r--@ 1 zhanghuan staff 276K 1 8 19:42 jackson-core-2.8.11.jar
-rw-r--r--@ 1 zhanghuan staff 50K 1 8 19:42 jackson-dataformat-cbor-2.8.11.jar
-rw-r--r--@ 1 zhanghuan staff 72K 1 8 19:42 jackson-dataformat-smile-2.8.11.jar
-rw-r--r--@ 1 zhanghuan staff 40K 1 8 19:42 jackson-dataformat-yaml-2.8.11.jar
-rw-r--r-- 1 zhanghuan staff 28K 1 10 21:33 flume-ng-high-eslog-sink-1.8.jar
-rw-r--r--@ 1 zhanghuan staff 62K 1 12 19:35 log4j-1.2-api-2.11.1.jar
这里我们使用log4j2 配置如下
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="com.mine.log">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}[%t] %-5p %c %msg%xEx%n" />
</Console>
<Kafka name="Kafka" topic="test1">
<JsonPartternLayout system = "mlog" pattern = "[%t] %c %msg%xEx%n"/>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
<AppenderRef ref="Kafka"/>
</Root>
</Loggers>
</Configuration>
测试代码
public class LogTest {
private static final Logger logger = LogManager.getLogger(LogTest.class);
@org.junit.Test
public void test() {
logger.info("输出信息……");
logger.trace("随意打印……");
logger.debug("调试信息……");
logger.warn( "警告信息……");
try {
new Thread(new Runnable() {
public void run() {
logger.warn("test……");
}
}).start();
LogTest.class.getClass().forName("123");
} catch (Exception e) {
logger.error("处理业务逻辑的时候发生一个错误……", e);
}
}
}
flume配置
#Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1
#Describe/configure the source
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.channels = channel1
agent.sources.r1.batchSize = 5000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = localhost:9092
agent.sources.r1.kafka.topics = test1
agent.sources.r1.kafka.consumer.group.id = custom.g.id
#Describe the sink
agent.sinks.k1.type = org.apache.flume.sink.hielasticsearch.ElasticSearchSink
agent.sinks.k1.hostNames = 127.0.0.1:9300
agent.sinks.k1.indexName = log_index
agent.sinks.k1.indexType = log_table
#agent.sinks.k1.clusterName = log_cluster
agent.sinks.k1.batchSize = 500
agent.sinks.k1.ttl = 5d
agent.sinks.k1.serializer = org.apache.flume.sink.hielasticsearch.ElasticSearchDynamicSerializer
#Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
启动kafka flume elasticsearch kibana即可 配置相关基本下载即用我就略过了哈 日志展示如下
标签:bin eth header return mile bubuko windows 直接 tar
原文地址:https://www.cnblogs.com/adia/p/10261273.html