码迷,mamicode.com
首页 > 其他好文 > 详细

EventBus

时间:2019-07-23 13:02:52      阅读:358      评论:0      收藏:0      [点我收藏+]

标签:rabbitmq   sub   wrap   pool   tty   订阅模式   接口   thread   事件处理   

EventBus 是一种事件发布订阅模式,借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

 技术图片

EventBus 相当于是定义一些抽象接口,可以用 MQ 来实现EventBus

1、模块的预处理模块,定义预处理方法,增加实现ILocalEventHandler,IDistributedEventHandler的服务增加到配置

LocalEventBusOptions、DistributedEventBusOptions,分别存储ITypeList<IEventHandler> Handlers { get; }

2、IEventHandle接口,分ILocalEventHandler<in TEvent>,IDistributedEventHandler

        /// <summary>
        /// Handler handles the event by implementing this method.
        /// </summary>
        /// <param name="eventData">Event data</param>
        Task HandleEventAsync(TEvent eventData);

IEventHandlerFactory,负责得到或创建事件处理器,三个实现IocEventHandlerFactory,SingleInstanceHandlerFactory,TransientEventHandlerFactory

 IEventHandlerDisposeWrapper GetHandler();


  public interface IEventHandlerDisposeWrapper : IDisposable
    {
        IEventHandler EventHandler { get; }
    }

  public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper
    {
        public IEventHandler EventHandler { get; }

        private readonly Action _disposeAction;

        public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action disposeAction = null)
        {
            _disposeAction = disposeAction;
            EventHandler = eventHandler;
        }

        public void Dispose()
        {
            _disposeAction?.Invoke();
        }
    }

 

3、EventBus:用来发布/订阅/取消订阅事件

技术图片

public abstract Task PublishAsync(Type eventType, object eventData);

public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);


 public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class;

 public abstract void Unsubscribe(Type eventType, IEventHandler handler);
 public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
public abstract void UnsubscribeAll(Type eventType);

二、AbpRabbitMqModule

 [DependsOn(
        typeof(AbpJsonModule),
        typeof(AbpThreadingModule)
        )]
    public class AbpRabbitMqModule : AbpModule
    {
        public override void ConfigureServices(ServiceConfigurationContext context)
        {
            var configuration = context.Services.GetConfiguration();
            Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ"));
        }

        public override void OnApplicationShutdown(ApplicationShutdownContext context)
        {
            context.ServiceProvider
                .GetRequiredService<IChannelPool>()
                .Dispose();

            context.ServiceProvider
                .GetRequiredService<IConnectionPool>()
                .Dispose();
        }

三、AbpEventBusRabbitMqModule

public override Task PublishAsync(Type eventType, object eventData)
        {
            var eventName = EventNameAttribute.GetNameOrDefault(eventType);
            var body = Serializer.Serialize(eventData);

            using (var channel = ConnectionPool.Get(RabbitMqEventBusOptions.ConnectionName).CreateModel())
            {
                channel.ExchangeDeclare(
                    RabbitMqEventBusOptions.ExchangeName,
                    "direct",
                    durable: true
                );
                
                var properties = channel.CreateBasicProperties();
                properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;

                channel.BasicPublish(
                   exchange: RabbitMqEventBusOptions.ExchangeName,
                    routingKey: eventName,
                    mandatory: true,
                    basicProperties: properties,
                    body: body
                );
            }

            return Task.CompletedTask;
        }
        public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
        {
            var handlerFactories = GetOrCreateHandlerFactories(eventType);
            
            handlerFactories.Add(factory);

            if (handlerFactories.Count == 1) //TODO: Multi-threading!
            {
                Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
            }

            return new EventHandlerFactoryUnregistrar(this, eventType, factory);
        }

 

EventBus

标签:rabbitmq   sub   wrap   pool   tty   订阅模式   接口   thread   事件处理   

原文地址:https://www.cnblogs.com/cloudsu/p/11230985.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!