标签:rabbitmq sub wrap pool tty 订阅模式 接口 thread 事件处理
EventBus 是一种事件发布订阅模式,借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。
EventBus 相当于是定义一些抽象接口,可以用 MQ 来实现EventBus
1、模块的预处理模块,定义预处理方法,增加实现ILocalEventHandler,IDistributedEventHandler的服务增加到配置
LocalEventBusOptions、DistributedEventBusOptions,分别存储ITypeList<IEventHandler> Handlers { get; }
2、IEventHandle接口,分ILocalEventHandler<in TEvent>,IDistributedEventHandler
/// <summary> /// Handler handles the event by implementing this method. /// </summary> /// <param name="eventData">Event data</param> Task HandleEventAsync(TEvent eventData);
IEventHandlerFactory,负责得到或创建事件处理器,三个实现IocEventHandlerFactory,SingleInstanceHandlerFactory,TransientEventHandlerFactory
IEventHandlerDisposeWrapper GetHandler(); public interface IEventHandlerDisposeWrapper : IDisposable { IEventHandler EventHandler { get; } } public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper { public IEventHandler EventHandler { get; } private readonly Action _disposeAction; public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action disposeAction = null) { _disposeAction = disposeAction; EventHandler = eventHandler; } public void Dispose() { _disposeAction?.Invoke(); } }
3、EventBus
:用来发布/订阅/取消订阅事件
public abstract Task PublishAsync(Type eventType, object eventData); public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory); public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class; public abstract void Unsubscribe(Type eventType, IEventHandler handler); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void UnsubscribeAll(Type eventType);
二、AbpRabbitMqModule
[DependsOn( typeof(AbpJsonModule), typeof(AbpThreadingModule) )] public class AbpRabbitMqModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ")); } public override void OnApplicationShutdown(ApplicationShutdownContext context) { context.ServiceProvider .GetRequiredService<IChannelPool>() .Dispose(); context.ServiceProvider .GetRequiredService<IConnectionPool>() .Dispose(); }
三、AbpEventBusRabbitMqModule
public override Task PublishAsync(Type eventType, object eventData) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); using (var channel = ConnectionPool.Get(RabbitMqEventBusOptions.ConnectionName).CreateModel()) { channel.ExchangeDeclare( RabbitMqEventBusOptions.ExchangeName, "direct", durable: true ); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; channel.BasicPublish( exchange: RabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body ); } return Task.CompletedTask; }
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } return new EventHandlerFactoryUnregistrar(this, eventType, factory); }
标签:rabbitmq sub wrap pool tty 订阅模式 接口 thread 事件处理
原文地址:https://www.cnblogs.com/cloudsu/p/11230985.html