码迷,mamicode.com
首页 > Web开发 > 详细

asp.net core mcroservices 架构之 分布式日志(三):集成kafka

时间:2019-01-02 23:30:39      阅读:238      评论:0      收藏:0      [点我收藏+]

标签:hit   处理   play   命名   replicate   分享   desc   margin   tar   

 

  一 kafka介绍                               

           kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了。硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快。kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性。最熟悉的消息队列框架有ActiveMQRabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列的一些瓶颈,比如并发,rabbitMQ在多个处理程序下,并不能保证执行顺序,还是必须自己去处理独占,而kafka使用consumer group的方式,实现了可以多个处理程序处理一个topic下的记录。如图:

技术分享图片

每个分区的记录保证能被每个组接受,这样可以并发去处理一个topic的记录,而且扩展组,则可以随意根据应用需求去扩展你的应用程序,但是每个组的消费者不能超过分区的数量。

kafka Distribution 提供了容错的功能,每一个partition都有一个服务器叫leader,还有零个或者一个以上的服务器叫follower,当这些follower都在同步数据的时候,leader扛起所有的写和读,当leader挂掉,follower会随机选取一个服务器当leader,当然必须有几个follower同步时 in-sync的。还有kafka虽然的那个记录具有原子性,但是并不支持事务。

因为这一篇并不是专门讲解kafka,所以点到为止。

      扩展服务 开发                          

     以前讲过,netcore的一个很重要的特性就是支持依赖注入,在这里一切皆服务。那么如果需要kafka作为日志服务的终端,就首先需要kafka服务,下面咱们就开发一个kafka服务。

首先,服务就是需要构建,这是netcore开发服务的第一步,我们首先建立一个IKafkaBuilder.cs接口类,如下:

 

homusing Microsoft.Extensions.DependencyInjection;

namespace Walt.Freamwork.Service
{
    public interface IKafkaBuilder
    {
         /// <summary>
        /// Gets the <see cref="IServiceCollection"/> where Logging services are configured.
        /// </summary>
        IServiceCollection Services { get; }
    }
}

再实现它,KafkaBuilder.cs

using Microsoft.Extensions.DependencyInjection;

namespace Walt.Freamwork.Service
{
    public class KafkaBuilder : IKafkaBuilder
    {
        public IServiceCollection Services {get;}

        public KafkaBuilder(IServiceCollection services)
        {
            Services=services;
        }
    }
}

再利用扩展方法为serviceCollection类加上扩展方法:

 using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Walt.Framework.Service.Kafka;

namespace Walt.Framework.Service
{
  
  
    public static class ServiceCollectionExtensions
    {
        /// <summary>
        /// Adds logging services to the specified <see cref="IServiceCollection" />.
        /// </summary>
        /// <param name="services">The <see cref="IServiceCollection" /> to add services to.</param>
        /// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns>
        public static IServiceCollection AddKafka(this IServiceCollection services)
        {
            return AddKafka(services, builder => { });
        }
 
        public static IServiceCollection AddKafka(this IServiceCollection services
        , Action<IKafkaBuilder> configure)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }

            services.AddOptions(); 
            configure(new KafkaBuilder(services));
            services.TryAddSingleton<IKafkaService,KafkaService>();  //kafka的服务类
            return services;
        }
    }
}
KafkaService的实现:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;

namespace  Walt.Framework.Service.Kafka
{
    public class KafkaService : IKafkaService
    {

        private KafkaOptions _kafkaOptions;
        private Producer _producer;
        public KafkaService(IOptionsMonitor<KafkaOptions>  kafkaOptions)
        {
            _kafkaOptions=kafkaOptions.CurrentValue; 
            kafkaOptions.OnChange((kafkaOpt,s)=>{
                _kafkaOptions=kafkaOpt; 
                    System.Diagnostics.Debug
                    .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
                    
            });
             _producer=new Producer(_kafkaOptions.Properties);
        }

        private byte[] ConvertToByte(string str)
        {
            return System.Text.Encoding.Default.GetBytes(str);
        }
 
        public  async Task<Message> Producer(string topic,string key,string value)
        {  
            if(string.IsNullOrEmpty(topic)
            ||string.IsNullOrEmpty(value))
            {
                throw new ArgumentNullException("topic或者value不能为null.");
            }
      
           var task=  await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value)); 
           return task;
        }
 
    }
}

 

那么咱们是不是忘记什么了,看上面的代码,是不是那个配置类KafkaOptions 还没有说明

再在技术分享图片这个位置添加kafka的配置类KafkaConfigurationOptions

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Walt.Freamwork.Service;

namespace Walt.Freamwork.Configuration
{
    public class KafkaConfigurationOptions : IConfigureOptions<KafkaOptions>
    {

        private readonly IConfiguration _configuration;


        public KafkaConfigurationOptions(IConfiguration configuration)
        {
           _configuration=configuration;
        }


        public void Configure(KafkaOptions options)
        {
                //这里仅仅自定义一些你自己的代码,使用上面configuration配置中的配置节,处理程序没法自动绑定的
                  一些事情。
        }
    }
}

然后,将配置类添加进服务:

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Walt.Framework.Service;

namespace Walt.Framework.Configuration
{
    public static class KafkaConfigurationExtensioncs
    {
          public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder
          ,IConfiguration configuration)
          {
               
                InitService( builder,configuration); 
                return builder;
          }


          public static void InitService(IKafkaBuilder builder,IConfiguration configuration)
          {
            builder.Services.TryAddSingleton<IConfigureOptions<KafkaOptions>>(
                  new KafkaConfigurationOptions(configuration));  //配置类和配置内容

            builder.Services.TryAddSingleton
            (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<KafkaOptions>>(
                  new ConfigurationChangeTokenSource<KafkaOptions>(configuration)) );//这个是观察类,如果更改,会激发onchange方法

            builder.Services
            .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<KafkaOptions>>
            (new ConfigureFromConfigurationOptions<KafkaOptions>(configuration))); //这个是option类,没这个,配置无法将类绑定
            
             builder.Services.AddSingleton(new KafkaConfiguration(configuration));
          }
    }
} 

 

ok,推送nuget,业务部分调用。

      kafka服务调用                          

在project中引用然后restore:

技术分享图片

引入命名空间:

技术分享图片

调用:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; 
using Newtonsoft.Json;
using Walt.Framework.Log;
using Walt.Framework.Configuration;
using Walt.Framework.Service;

namespace Walt.TestMcroServoces.Webapi
{
    public class Program
    { 
        public static void Main(string[] args)
        { 
             
            var host = new WebHostBuilder()
            .ConfigureAppConfiguration((hostingContext, configContext) =>{
                 var en=hostingContext.HostingEnvironment;
                 if(en.IsDevelopment())
                 {
                     configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json");
                 }
                 else
                 {
                     configContext.AddJsonFile("appsettings.json");
                 }
                   configContext.AddCommandLine(args)
             .AddEnvironmentVariables()
             .SetBasePath(Directory.GetCurrentDirectory()).Build(); 
              
            }).ConfigureServices((context,configureServices)=>{
                   configureServices.AddKafka(KafkaBuilder=>{
                    KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService"));
                   });
            })   //kafka的调用。
            .ConfigureLogging((hostingContext, logging) => {
 
                logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"))
                .AddCustomizationLogger();

            }).UseKestrel(KestrelServerOption=>{
                KestrelServerOption.ListenAnyIP(801);
            })
            .UseStartup<Startup>().Build(); 
            host.Run(); 
            Console.ReadKey();
        }
    }

}

 

 

然后提交git,让jenkins构建docker发布运行:

jenkin是是非常牛的一款构建工具,不仅仅根据插件可以扩展不同环境,还支持分布式构建.

 

技术分享图片

这是我们用jenikins构建的的:

技术分享图片

让它跑起来:

技术分享图片

调用看看:

技术分享图片

这个方法是输出Properties数组的:

技术分享图片

  四 集成kafka                         

kafka的接口不多,看看都有那些:

https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html

技术分享图片

ConsumerProducer是咱们发布消息和调用消息的两个主类,代码在上文已经实现的service

客户端代码:

使用my-replicated-topic-morepart这儿topic,还是希望多分区,因为后面consumer使用分布式计算读取。

技术分享图片

consumer先在客户端监听:

技术分享图片

product端的调用代码:

技术分享图片

执行这个接口后,再看consumer接收到的消息:

技术分享图片

最后一步,将咱们kafka日志部分替换为真实的kafka环境,看结果:

技术分享图片

分布式日志到这里结束,可能大家觉得后面还有日志索引和日志展现,因为这个读kafka需要分布式去处理,

我下面刚好要写分布式计算的文章,所以到时可以拿这个当例子,承前继后。

 

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;

namespace Walt.Framework.Service.Kafka
{
public class KafkaService : IKafkaService
{

private KafkaOptions _kafkaOptions;
private Producer _producer;
public KafkaService(IOptionsMonitor<KafkaOptions> kafkaOptions)
{
_kafkaOptions=kafkaOptions.CurrentValue;
kafkaOptions.OnChange((kafkaOpt,s)=>{
_kafkaOptions=kafkaOpt;
System.Diagnostics.Debug
.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
 
});
_producer=new Producer(_kafkaOptions.Properties);
}

private byte[] ConvertToByte(string str)
{
return System.Text.Encoding.Default.GetBytes(str);
}
 
public async Task<Message> Producer(string topic,string key,string value)
{
if(string.IsNullOrEmpty(topic)
||string.IsNullOrEmpty(value))
{
throw new ArgumentNullException("topic或者value不能为null.");
}
 
var task= await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value));
return task;
}
 
}
}

asp.net core mcroservices 架构之 分布式日志(三):集成kafka

标签:hit   处理   play   命名   replicate   分享   desc   margin   tar   

原文地址:https://www.cnblogs.com/ck0074451665/p/10211725.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!