标签:登录 option evel raise cti 业务 param router uil
基于Log4Net来实现与kafka通讯Appender
public class KafkaAppender : AppenderSkeleton
{
#region Fields
/// <summary>
/// Kafka 生产者
/// </summary>
private Producer _kafkaProducer;
#endregion Fields
#region Properties
/// <summary>
/// Brokers
/// </summary>
public string Brokers { get; set; }
/// <summary>
/// Topic
/// </summary>
public string Topic { get; set; }
#endregion Properties
#region Methods
/// <summary>
/// Initialize the appender based on the options set
/// </summary>
/// <remarks>
/// <para>
/// This is part of the <see cref="T:log4net.Core.IOptionHandler" /> delayed object
/// activation scheme. The <see cref="M:log4net.Appender.AppenderSkeleton.ActivateOptions" /> method must
/// be called on this object after the configuration properties have
/// been set. Until <see cref="M:log4net.Appender.AppenderSkeleton.ActivateOptions" /> is called this
/// object is in an undefined state and must not be used.
/// </para>
/// <para>
/// If any of the configuration properties are modified then
/// <see cref="M:log4net.Appender.AppenderSkeleton.ActivateOptions" /> must be called again.
/// </para>
/// </remarks>
public override void ActivateOptions()
{
base.ActivateOptions();
InitKafkaProducer();
}
/// <summary>
/// Subclasses of <see cref="T:log4net.Appender.AppenderSkeleton" /> should implement this method
/// to perform actual logging.
/// </summary>
/// <param name="loggingEvent">The event to append.</param>
/// <remarks>
/// <para>
/// A subclass must implement this method to perform
/// logging of the <paramref name="loggingEvent" />.
/// </para>
/// <para>
/// This method will be called by <see cref="M:DoAppend(LoggingEvent)" />
/// if all the conditions listed for that method are met.
/// </para>
/// <para>
/// To restrict the logging of events in the appender
/// override the <see cref="M:PreAppendCheck()" /> method.
/// </para>
/// </remarks>
protected override void Append(LoggingEvent loggingEvent)
{
try
{
var message = GetLogMessage(loggingEvent);
var topic = GetTopic(loggingEvent);
_ = _kafkaProducer.SendMessageAsync(topic, new[] {new Message(message)});
}
catch (Exception ex)
{
ErrorHandler.Error("KafkaProducer SendMessageAsync", ex);
}
}
/// <summary>
/// Raises the Close event.
/// </summary>
/// <remarks>
/// <para>
/// Releases any resources allocated within the appender such as file handles,
/// network connections, etc.
/// </para>
/// <para>
/// It is a programming error to append to a closed appender.
/// </para>
/// </remarks>
protected override void OnClose()
{
base.OnClose();
StopKafkaProducer();
}
private string GetLogMessage(LoggingEvent loggingEvent)
{
var builder = new StringBuilder();
using (var writer = new StringWriter(builder))
{
Layout.Format(writer, loggingEvent);
if (Layout.IgnoresException && loggingEvent.ExceptionObject != null)
writer.Write(loggingEvent.GetExceptionString());
return writer.ToString();
}
}
private string GetTopic(LoggingEvent loggingEvent)
{
return string.IsNullOrEmpty(Topic) ? Path.GetFileNameWithoutExtension(loggingEvent.Domain) : Topic;
}
/// <summary>
/// 初始化Kafka 生产者
/// </summary>
private void InitKafkaProducer()
{
try
{
if (string.IsNullOrEmpty(Brokers)) Brokers = "http://localhost:9200";
if (_kafkaProducer == null)
{
var brokers = new Uri(Brokers);
var kafkaOptions = new KafkaOptions(brokers)
{
Log = new KafkaLog()
};
_kafkaProducer = new Producer(new BrokerRouter(kafkaOptions));
}
}
catch (Exception ex)
{
ErrorHandler.Error("InitKafkaProducer", ex);
}
}
/// <summary>
/// 停止生产者
/// </summary>
private void StopKafkaProducer()
{
try
{
_kafkaProducer?.Stop();
}
catch (Exception ex)
{
ErrorHandler.Error("StopKafkaProducer", ex);
}
}
#endregion Methods
}
基于之前定义接口,来实现kafkaLogService
public sealed class KafkaLogService : ILogService
{
#region Constructors
/// <summary>
/// Initializes the <see cref="FileLogService" /> class.
/// </summary>
static KafkaLogService()
{
KafkaLogger = LogManager.GetLogger(KafkaLoggerName);
}
#endregion Constructors
#region Fields
/// <summary>
/// Kafka logger name
/// </summary>
public const string KafkaLoggerName = "KafkaLogger";
/// <summary>
/// Kafka logger
/// </summary>
public static readonly ILog KafkaLogger;
#endregion Fields
#region Methods
/// <summary>
/// Debug记录
/// </summary>
/// <param name="message">日志信息</param>
public void Debug(string message)
{
if (KafkaLogger.IsDebugEnabled) KafkaLogger.Debug(message);
}
/// <summary>
/// Debug记录
/// </summary>
/// <param name="message">日志信息</param>
/// <param name="ex">异常信息</param>
public void Debug(string message, Exception ex)
{
if (KafkaLogger.IsDebugEnabled) KafkaLogger.Debug(message, ex);
}
/// <summary>
/// Error记录
/// </summary>
/// <param name="message">日志信息</param>
public void Error(string message)
{
if (KafkaLogger.IsErrorEnabled) KafkaLogger.Error(message);
}
/// <summary>
/// Error记录
/// </summary>
/// <param name="message">日志信息</param>
/// <param name="ex">异常信息</param>
public void Error(string message, Exception ex)
{
if (KafkaLogger.IsErrorEnabled) KafkaLogger.Error(message, ex);
}
/// <summary>
/// Fatal记录
/// </summary>
/// <param name="message">日志信息</param>
public void Fatal(string message)
{
if (KafkaLogger.IsFatalEnabled) KafkaLogger.Fatal(message);
}
/// <summary>
/// Fatal记录
/// </summary>
/// <param name="message">日志信息</param>
/// <param name="ex">异常信息</param>
public void Fatal(string message, Exception ex)
{
if (KafkaLogger.IsFatalEnabled) KafkaLogger.Fatal(message, ex);
}
/// <summary>
/// Info记录
/// </summary>
/// <param name="message">日志信息</param>
public void Info(string message)
{
if (KafkaLogger.IsInfoEnabled) KafkaLogger.Info(message);
}
/// <summary>
/// Info记录
/// </summary>
/// <param name="message">日志信息</param>
/// <param name="ex">异常信息</param>
public void Info(string message, Exception ex)
{
if (KafkaLogger.IsInfoEnabled) KafkaLogger.Info(message, ex);
}
/// <summary>
/// Warn记录
/// </summary>
/// <param name="message">日志信息</param>
public void Warn(string message)
{
if (KafkaLogger.IsWarnEnabled) KafkaLogger.Warn(message);
}
/// <summary>
/// Warn记录
/// </summary>
/// <param name="message">日志信息</param>
/// <param name="ex">异常信息</param>
public void Warn(string message, Exception ex)
{
if (KafkaLogger.IsWarnEnabled) KafkaLogger.Warn(message, ex);
}
#endregion Methods
}
修改Log4Net.Config,定义Kafka的Topic以及Brokers
<appender name="KafkaAppender" type="MasterChief.DotNet.Core.KafkaLog.KafkaAppender, MasterChief.DotNet.Core.KafkaLog">
<param name="Topic" value="beats" />
<param name="Brokers" value="http://localhost:9092" />
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="发生时间:%date %newline事件级别:%-5level %newline事件来源:%logger%newline日志内容:%message%newline" />
</layout>
</appender>
[开源]基于Log4Net简单实现KafkaAppender
标签:登录 option evel raise cti 业务 param router uil
原文地址:https://www.cnblogs.com/MeetYan/p/10693545.html