码迷,mamicode.com
首页 > 其他好文 > 详细

说下hangfire吧

时间:2020-03-14 00:48:10      阅读:106      评论:0      收藏:0      [点我收藏+]

标签:日志   Once   param   any   除了   dstat   das   throw   queue   

最近因工作需要开发计划任务模块(严格来说应该是修改bug吧,其他同事负责的)接触到了Hangfire。早前听同事说hangfire有点坑,怀着好奇,趁这两天bug改的差不多了,在github上面down了hangfire源码,下面分享一下,自己读hangfire源码的一些理解,和工作中需要注意的地方。介绍大概分为以下几个部分吧。1.准备工作,2.简单使用,3.源码分析,4.避坑。需要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,希望在评论指正。
准备工作:hangfire源代码的代码量不多,github地址: https://github.com/HangfireIO/Hangfire,有兴趣的朋友可以自己下载瞅瞅源码。功能上大概可以分为客户端模式和服务端模式。用到的技术大概有Multi Thread、Expression、Dapper、Cron等。可以这么说,它的定时任务完全就是基于多线程协作实现的。因为是多线程环境,所以个人觉得看起来有点费力。
简单使用:.Net&.Net Core环境都可以使用,下面就以.Net Core的使用为例。
1.客户端和服务端独立部署
client端
 1 public IServiceProvider ConfigureServices(IServiceCollection services)
 2         {
 3             // 其他代码
 4              services.AddHangfire(config =>
 5             {
 6                  config.UseSqlServerStorage(...);
 7             });
 8         }
 9  
10 public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
11         {
12             // 其他代码...
13             // 启用Dashboard看板
14             app.UseHangfireDashboard();
15         }

 

server端
 1 public void Configuration(IAppBuilder app)
 2         {
 3             GlobalConfiguration.Configuration
 4                  .UseSqlServerStorage("连接字符串", new SqlServerStorageOptions
 5                 {
 6                     // options
 7                 });
 8             app.UseHangfireServer(new BackgroundJobServerOptions
 9             {
10             });
11         }
12  
13  
或者
1 services.AddHangfireServer(options =>
2                     {
3                         // 基于IHostedService接口实现
4                     });
PS:server端还有一种实现方式,实现IHostedService接口 其实跟上面的使用方法一样的,注入到服务就ok,在程序启动阶段会自动执行IHostedService接口的两个方法,可以简单看下IHostedService接口的定义。
1   public interface IHostedService
2   {
3     Task StartAsync(CancellationToken cancellationToken);
4     Task StopAsync(CancellationToken cancellationToken);
5   }
接口就定义了两个方法,start是在程序启动的时候执行,当然stop就是在程序停止的时候执行。我们用一张图简单描绘一下它的执行时机,图是盗的。
技术图片技术图片
以上就是hangfire的client端和server端分开部署的一个简单应用,下面我们看下第二种,client&server部署在同一台机器上。
2.客户端和服务端统一部署
1 public void Configuration(IAppBuilder app)
2 {
3     GlobalConfiguration.Configuration.UseSqlServerStorage(); // 配置数据库连接
4     
5     app.UseHangfireServer(); // 启用server
6     app.UseHangfireDashboard(); // 启用看板
7 }

 

简单的几行代码,当然我也只会简单的用法。以上服务注入并执行,接下来就是往hangfire里面添加任务。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); // 立即执行
2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); // 延后执行
3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); // 循环执行,支持cron表达式
简单使用就到这吧,我们继续大纲的第三部分,源码分析。
 
源码分析
客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,我们主要是看看服务端的执行原理。我们先找到入口,也可以看做是NetCore里面的一个中间件吧。看代码
1 app.UseHangfireServer(); // 启用server
UseHangfireServer实现
 1 public static IAppBuilder UseHangfireServer(
 2             [NotNull] this IAppBuilder builder,
 3             [NotNull] JobStorage storage,
 4             [NotNull] BackgroundJobServerOptions options,
 5             [NotNull] params IBackgroundProcess[] additionalProcesses)
 6         {
 7             // 其他代码...
 8             var server = new BackgroundJobServer(options, storage,  additionalProcesses); 
 9             
10             return builder;
11         }

 

UseHangfireServer扩展方法实现里面,比较重要的一行代码就是创建BackgroundJobServer,BackgroundJobServer实现了IBackgroundProcessingServer接口,server的启动也就是间接在它的构造器里面完成的。我们不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer类的定义。
 1 // IBackgroundProcessingServer
 2 public interface IBackgroundProcessingServer : IDisposable
 3     {
 4         void SendStop();
 5         bool WaitForShutdown(TimeSpan timeout);
 6         Task WaitForShutdownAsync(CancellationToken cancellationToken);
 7     }
 8  
 9 // BackgroundJobServer
10 public class BackgroundJobServer : IBackgroundProcessingServer
11     {
12         // 其他成员...
13         public BackgroundJobServer(
14             [NotNull] BackgroundJobServerOptions options,
15             [NotNull] JobStorage storage,
16             [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses,
17             [CanBeNull] IJobFilterProvider filterProvider,
18             [CanBeNull] JobActivator activator,
19             [CanBeNull] IBackgroundJobFactory factory,
20             [CanBeNull] IBackgroundJobPerformer performer,
21             [CanBeNull] IBackgroundJobStateChanger stateChanger)
22         {
23             // 其他代码
24             var processes = new List<IBackgroundProcessDispatcherBuilder>();
25             processes.AddRange(GetRequiredProcesses(filterProvider, activator,  factory, performer, stateChanger));
26             processes.AddRange(additionalProcesses.Select(x =>  x.UseBackgroundPool(1)));
27             var properties = new Dictionary<string, object>
28             {
29                 { "Queues", options.Queues },
30                 { "WorkerCount", options.WorkerCount }
31             };
32             
33             _processingServer = new BackgroundProcessingServer(
34                 storage,
35                 processes,
36                 properties,
37                 GetProcessingServerOptions());
38         }
39         public void SendStop()
40         {
41         }
42         public void Dispose()
43         {
44         }
45         [Obsolete("This method is a stub. There is no need to call the `Start`  method. Will be removed in version 2.0.0.")]
46         public void Start()
47         {
48         }
49         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
50         public void Stop()
51         {
52         }
53         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
54         public void Stop(bool force)
55         {
56         }
57         public bool WaitForShutdown(TimeSpan timeout)
58         {
59         }
60         public Task WaitForShutdownAsync(CancellationToken cancellationToken)
61         {
62         }

 

IBackgroundProcessingServer接口里面的这几个方法都是跟停用server,取消任务清理资源相关的。BackgroundJobServer类里面真正完成接口的实现是由BackgroundProcessingServer类型的同名函数实现,这个对象是在构造函数里面初始化的,在初始化BackgroundProcessingServer类型的同时,创建了若干IBackgroundProcessDispatcherBuilder实现类BackgroundProcessDispatcherBuilder的实例,hangfire默认实现了7种dispatcher,我们任务、日志、心跳等等独立线程都是由它的Create方法完成,这个地方不算server启动主线,会在后面细说。我们继续看看BackgroundProcessingServer这个类型。这里需要注意的是里面有几个方法好像是被停用了,start、stop等方法,官方也注释了,被删除了。start方法被停用了,难道我们的server启动是在BackgroundProcessingServer类型里面?继续看BackgroundProcessingServer的定义。
 1 public sealed class BackgroundProcessingServer : IBackgroundProcessingServer
 2     {
 3         // 其他成员
 4         internal BackgroundProcessingServer(
 5             [NotNull] BackgroundServerProcess process,
 6             [NotNull] BackgroundProcessingServerOptions options)
 7         {
 8             _process = process ?? throw new ArgumentNullException(nameof(process));
 9             _options = options ?? throw new ArgumentNullException(nameof(options));
10             _dispatcher = CreateDispatcher();
11 #if !NETSTANDARD1_3
12             AppDomain.CurrentDomain.DomainUnload += OnCurrentDomainUnload;
13             AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainUnload;
14 #endif
15         }
16         public void SendStop()
17         {
18         }
19         public bool WaitForShutdown(TimeSpan timeout)
20         {
21         }
22         public async Task WaitForShutdownAsync(CancellationToken cancellationToken)
23         {
24         }
25         public void Dispose()
26         {
27             
28         }
29         private void OnCurrentDomainUnload(object sender, EventArgs args)
30         {
31             
32         }
33         private IBackgroundDispatcher CreateDispatcher()
34         {
35             var execution = new BackgroundExecution(
36                 _stoppingCts.Token,
37                 new BackgroundExecutionOptions
38                 {
39                     Name = nameof(BackgroundServerProcess),
40                     ErrorThreshold = TimeSpan.Zero,
41                     StillErrorThreshold = TimeSpan.Zero,
42                     RetryDelay = retry => _options.RestartDelay
43                 });
44             return new BackgroundDispatcher(
45                 execution,
46                 RunServer,
47                 execution,
48                 ThreadFactory);
49         }
50         private void RunServer(Guid executionId, object state)
51         {
52             _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
53         }
54         private static IEnumerable<Thread> ThreadFactory(ThreadStart threadStart)
55         {
56             yield return new Thread(threadStart)
57             {
58                 IsBackground = true,
59                 Name = $"{nameof(BackgroundServerProcess)}  #{Interlocked.Increment(ref _lastThreadId)}",
60             };
61         }
62     }

 

果不其然,server的启动快要揭开神秘的面纱了,RunServer?翻译过来应该是启动服务吧,我们暂且不去管他,先记一下这个有个runserver,我们继续跟踪。在构造函数里面调用了一个CreateDispatcher()的方法,我们看下它的实现
 1 private IBackgroundDispatcher CreateDispatcher()
 2         {
 3             var execution = new BackgroundExecution(
 4                 _stoppingCts.Token,
 5                 new BackgroundExecutionOptions
 6                 {
 7                     Name = nameof(BackgroundServerProcess),
 8                     ErrorThreshold = TimeSpan.Zero,
 9                     StillErrorThreshold = TimeSpan.Zero,
10                     RetryDelay = retry => _options.RestartDelay
11                 });
12             return new BackgroundDispatcher(
13                 execution,
14                 RunServer,
15                 execution,
16                 ThreadFactory);
17         }

 

在CreateDispatcher方法里面返回了一个BackgroundDispatcher,字面意思好像是后台分发器,并且指定了回调runserver,BackgroundDispatcher实现了IBackgroundDispatcher接口,我们先看下它们的定义。
 1 // IBackgroundDispatcher
 2 public interface IBackgroundDispatcher : IDisposable
 3     {
 4         bool Wait(TimeSpan timeout);
 5         Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken);
 6     }
 7  
 8 // BackgroundDispatcher
 9 internal sealed class BackgroundDispatcher : IBackgroundDispatcher
10     {
11         // 其他成员
12         public BackgroundDispatcher(
13             [NotNull] IBackgroundExecution execution,
14             [NotNull] Action<Guid, object> action,
15             [CanBeNull] object state,
16             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory)
17         {
18             if (threadFactory == null) throw new  ArgumentNullException(nameof(threadFactory));
19             _execution = execution ?? throw new  ArgumentNullException(nameof(execution));
20             _action = action ?? throw new ArgumentNullException(nameof(action));
21             _state = state;
22 #if !NETSTANDARD1_3
23             AppDomainUnloadMonitor.EnsureInitialized();
24 #endif
25             var threads = threadFactory(DispatchLoop)?.ToArray();
26             if (threads == null || threads.Length == 0)
27             {
28                 throw new ArgumentException("At least one unstarted thread should be  created.", nameof(threadFactory));
29             }
30             if (threads.Any(thread => thread == null || (thread.ThreadState &  ThreadState.Unstarted) == 0))
31             {
32                 throw new ArgumentException("All the threads should be non-null and  in the ThreadState.Unstarted state.", nameof(threadFactory));
33             }
34             _stopped = new CountdownEvent(threads.Length);
35             foreach (var thread in threads)
36             {
37                 thread.Start();
38             }
39         }
40         public bool Wait(TimeSpan timeout)
41         {
42             return _stopped.WaitHandle.WaitOne(timeout);
43         }
44         public async Task WaitAsync(TimeSpan timeout, CancellationToken  cancellationToken)
45         {
46             await _stopped.WaitHandle.WaitOneAsync(timeout,  cancellationToken).ConfigureAwait(false);
47         }
48         public void Dispose()
49         {
50         }
51         public override string ToString()
52         {
53         }
54         private void DispatchLoop()
55         {
56             try
57             {
58                 _execution.Run(_action, _state);
59             }
60             catch (Exception ex)
61             {
62  
63             }
64             finally
65             {
66                 try
67                 {
68                     _stopped.Signal();
69                 }
70                 catch (ObjectDisposedException)
71                 {
72  
73                 }
74             }
75         }
76     }

 

从IBackgroundDispatcher接口的定义来看,分发器应该是负责协调资源处理,我们具体看看BackgroundDispatcher的实现。以上代码就是server的启动执行核心代码并且我以加粗,其实就是启动线程Loop执行。在DispatchLoop方法里面间接调用了我上面说的runserver方法。在runserver方法里面实现了整个server端的初始化工作。我们接着看DispatchLoop方法的实现 ,在这个方法里面调用了IBackgroundExecution接口的run方法,继续IBackgroundExecution接口的定义。
1 public interface IBackgroundExecution : IDisposable
2     {
3         void Run([NotNull] Action<Guid, object> callback, [CanBeNull] object  state);
4         Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull]  object state);
5     }

 

就两方法,run包含同步和异步,看看它的唯一实现类BackgroundExecution。
 1   internal sealed class BackgroundExecution : IBackgroundExecution
 2     {
 3                 // 其他成员
 4         public void Run(Action<Guid, object> callback, object state)
 5         {
 6             if (callback == null) throw new ArgumentNullException(nameof(callback));
 7             var executionId = Guid.NewGuid();
 8            
 9             {
10 #if !NETSTANDARD1_3
11                 try
12 #endif
13                 {
14                     HandleStarted(executionId, out var nextDelay);
15                     while (true)
16                     {
17                         // Don‘t place anything here.
18                         try
19                         {
20                            
21                             if (StopRequested) break;
22                             if (nextDelay > TimeSpan.Zero)
23                             {
24                                 HandleDelay(executionId, nextDelay);
25                             }
26                             callback(executionId, state);
27                             HandleSuccess(out nextDelay);
28                         }
29 #if !NETSTANDARD1_3
30                         catch (ThreadAbortException) when  (AppDomainUnloadMonitor.IsUnloading)
31                         {
32                             // Our thread is aborted due to AppDomain unload. It‘s  better to give up to
33                             // not to cause the host to be more aggressive.
34                             throw;
35                         }
36 #endif
37                         catch (OperationCanceledException) when (StopRequested)
38                         {
39                             break;
40                         }
41                         catch (Exception ex)
42                         {
43                             HandleException(executionId, ex, out nextDelay);
44                         }
45                     }
46                     HandleStop(executionId);
47                 }
48 #if !NETSTANDARD1_3
49                 catch (ThreadAbortException ex)
50                 {
51                     HandleThreadAbort(executionId, ex);
52                 }
53 #endif
54             }
55         }
56 }

 

hangfire里面所有的独立线程都是通过run方法执行,然后回调到自己的实现类Execute方法,自此每个独立的功能线程就循环干着自己独立的工作(这个后面会单独分析RecurringJobScheduler)。继续我们的主线,server启动,我们以run的同步方法为例,第一个线程(我们就叫它主线程吧)启动了一个while循环,在循环里面并且callback调用了我们的runserver方法。
    
1 private void RunServer(Guid executionId, object state)
2         {
3             _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
4         }

 

在runserver方法里面的实现很简单,直接调用了_process的execute方法,我们简单看下_process类型IBackgroundServerProcess的定义。
1 internal interface IBackgroundServerProcess
2     {
3         void Execute(
4             Guid executionId,
5             BackgroundExecution execution,
6             CancellationToken stoppingToken,
7             CancellationToken stoppedToken,
8             CancellationToken shutdownToken);
9     }

 

IBackgroundServerProcess的定义就一个execute方法,这个接口的工作其实就是初始化server服务端,我们看看它的唯一实现类BackgroundServerProcess。
  1 internal sealed class BackgroundServerProcess : IBackgroundServerProcess
  2     {
  3         
  4         // 其他成员
  5         public BackgroundServerProcess(
  6             [NotNull] JobStorage storage,
  7             [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder> dispatcherBuilders,
  8             [NotNull] BackgroundProcessingServerOptions options,
  9             [NotNull] IDictionary<string, object> properties)
 10         {
 11             if (dispatcherBuilders == null) throw new ArgumentNullException(nameof(dispatcherBuilders));
 12  
 13  
 14             _storage = storage ?? throw new ArgumentNullException(nameof(storage));
 15             _options = options ?? throw new ArgumentNullException(nameof(options));
 16             _properties = properties ?? throw new ArgumentNullException(nameof(properties));
 17  
 18  
 19             var builders = new List<IBackgroundProcessDispatcherBuilder>();
 20             builders.AddRange(GetRequiredProcesses()); // 添加默认的工作dispatcher也就是独立线程
 21             builders.AddRange(GetStorageComponents());
 22             builders.AddRange(dispatcherBuilders);
 23  
 24  
 25             _dispatcherBuilders = builders.ToArray();
 26         }
 27  
 28  
 29         public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken,
 30             CancellationToken stoppedToken, CancellationToken shutdownToken)  // server初始化
 31         {
 32             var serverId = GetServerId();
 33             Stopwatch stoppedAt = null;
 34  
 35  
 36             void HandleRestartSignal()
 37             {
 38                 if (!stoppingToken.IsCancellationRequested)
 39                 {
 40                     _logger.Info($"{GetServerTemplate(serverId)} caught restart signal...");
 41                 }
 42             }
 43             using (var restartCts = new CancellationTokenSource())
 44             using (var restartStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token))
 45             using (var restartStoppedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token))
 46             using (var restartShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token))
 47             using (restartStoppingCts.Token.Register(HandleStopRestartSignal))
 48             using (stoppingToken.Register(HandleStoppingSignal))
 49             using (stoppedToken.Register(HandleStoppedSignal))
 50             using (shutdownToken.Register(HandleShutdownSignal))
 51             using (restartCts.Token.Register(HandleRestartSignal))
 52             {
 53                 var context = new BackgroundServerContext(
 54                     serverId,
 55                     _storage,
 56                     _properties,
 57                     restartStoppingCts.Token,
 58                     restartStoppedCts.Token,
 59                     restartShutdownCts.Token);
 60                 var dispatchers = new List<IBackgroundDispatcher>();
 61                 CreateServer(context);
 62                 try
 63                 {
 64                     // ReSharper disable once AccessToDisposedClosure
 65                     using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) // 创建守护线程
 66                     {
 67                         StartDispatchers(context, dispatchers); // 启动hangfire默认初始化的所有独立任务线程
 68                         execution.NotifySucceeded();
 69                         WaitForDispatchers(context, dispatchers);
 70  
 71  
 72                         restartCts.Cancel();
 73  
 74                         heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult();
 75                     }
 76                 }
 77                 finally
 78                 {
 79                     DisposeDispatchers(dispatchers);
 80                     ServerDelete(context, stoppedAt);
 81                 }
 82             }
 83         }
 84  
 85  
 86         private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) // 创建守护线程
 87         {
 88             return new ServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart)
 89                 .UseBackgroundPool(threadCount: 1)
 90                 .Create(context, _options);
 91         }
 92  
 93  
 94         private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() // 初始化日志和任务监控线程
 95         {
 96             yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1);
 97             yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1);
 98         }
 99         private string GetServerId() // 获取serverid
100         {
101             var serverName = _options.ServerName
102                  ?? Environment.GetEnvironmentVariable("COMPUTERNAME")
103                  ?? Environment.GetEnvironmentVariable("HOSTNAME");
104             var guid = Guid.NewGuid().ToString();
105  
106             return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}" : guid;
107         }
108  
109         
110         private void CreateServer(BackgroundServerContext context) // 创建server,写入Server数据表
111         {
112             var stopwatch = Stopwatch.StartNew();
113             using (var connection = _storage.GetConnection())
114             {
115                 connection.AnnounceServer(context.ServerId, GetServerContext(_properties));
116             }
117             stopwatch.Stop();
118  
119  
120             ServerJobCancellationToken.AddServer(context.ServerId);
121             _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms");
122         }
123  
124  
125         private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) // 启动所有独立的任务线程,包括我们的队列计划、循环计划、日志、守护等等线程
126         {
127  
128             foreach (var dispatcherBuilder in _dispatcherBuilders)
129             {
130                 dispatchers.Add(dispatcherBuilder.Create(context, _options));
131             }
132         }
133  
134     }

 

以上代码我有做精简处理,不要纠结里面的实现,代码注释也比较详细。下面我做一个简单的总结吧,第一个线程(暂时叫主线程吧)从startup里面调用usehangfireserver扩展方法-》启动一个新的worker线程用于初始化&启动server-》主程返回-》启动hangfire所有任务线程-》创建的第一个worker线程挂起(用于处理所有任务线程的资源释放)。server的初始化工作大概就是这些,下面详细看看hangfire的任务线程的执行原理,这里我们以RecurringJobScheduler循环任务为例。
 
RecurringJobScheduler实现机制
还记得上面提到的7个dispatcher任务线程的创建吗?这7个默认的任务线程初始化就发生在上面加粗的代码里面StartDispatchers方法,我们看代码。
1 private void StartDispatchers(BackgroundServerContext context,  ICollection<IBackgroundDispatcher> dispatchers)
2         {
3                // 其他代码...
4             foreach (var dispatcherBuilder in _dispatcherBuilders)
5             {
6                 dispatchers.Add(dispatcherBuilder.Create(context, _options)); // 初始化独立任务线程
7             }
8         }

 

遍历_dispatcherBuilders数组,7种任务类型,分别调用它们的Create方法。继续看create方法。
   
 1  public IBackgroundDispatcher Create(BackgroundServerContext context,  BackgroundProcessingServerOptions options) // 第一步
 2         {
 3             // 其他代码
 4             var execution = new BackgroundExecution(
 5                 context.StoppingToken,
 6                 new BackgroundExecutionOptions
 7                 {
 8                     Name = _process.GetType().Name,
 9                     RetryDelay = options.RetryDelay
10                 }); // 定义自己的execution
11             return new BackgroundDispatcher( // 创建BackgroundDispatcher 
12                 execution,
13                 ExecuteProcess, // 指定回调
14                 Tuple.Create(_process, context, execution), // 创建三元组上下文,注意一下1元组这个对象
15                 _threadFactory);
16         }
17  
18 public BackgroundDispatcher(  // 第二步
19             [NotNull] IBackgroundExecution execution,
20             [NotNull] Action<Guid, object> action,
21             [CanBeNull] object state,
22             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory)
23         {
24    
25             _state = state;
26  
27             var threads = threadFactory(DispatchLoop)?.ToArray();
28            
29             foreach (var thread in threads)
30             {
31                 thread.Start(); // 执行线程
32             }
33         }
34  
35 private void DispatchLoop() // 第三步
36         {
37             try
38             {
39                 _execution.Run(_action, _state);  // 在run里面回调_action
40             }
41             catch (Exception ex)
42             {
43             }
44             finally
45             {
46                 try
47                 {
48                     _stopped.Signal();
49                 }
50                 catch (ObjectDisposedException)
51                 {
52                 }
53             }
54         }
55  
56 private static void ExecuteProcess(Guid executionId, object state) // 第四步 回调方法,对应上面的指定回调
57         {
58             var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext,  BackgroundExecution>)state;
59             var serverContext = tuple.Item2;
60             var context = new BackgroundProcessContext( // 创建公共上下文
61                 serverContext.ServerId,
62                 serverContext.Storage,
63                 serverContext.Properties.ToDictionary(x => x.Key, x => x.Value),
64                 executionId,
65                 serverContext.StoppingToken,
66                 serverContext.StoppedToken,
67                 serverContext.ShutdownToken);
68             while (!context.IsStopping)
69             {
70                 tuple.Item1.Execute(context); // 执行自己元组对应的实例
71                 tuple.Item3.NotifySucceeded();
72             }
73         }

 

上面有点乱啊,我大概简单串起来说一下。第一步在create方法里面创建了BackgroundDispatcher并指定了元组参数-》第二步绑定线程的执行函数Loop并且执行-》第三步执行Loop并且回调_action委托-》第四步_action参数对应的函数地址就是ExecuteProcess,最后在ExecuteProcess里面通过元组参数调用对应的任务类型,自此7种任务类型启动并开始工作。以上代码还有个细节需要说明一下,Tuple.Create(_process, context, execution)。元组的第一个参数,其类型为IBackgroundProcess,看下定义。
1 public interface IBackgroundProcess : IServerProcess
2     {
3         void Execute([NotNull] BackgroundProcessContext context);
4     }

 

接口就定义了一个方法,没什么特别的,但是它的几个实现类就是我们单独的任务类,我们下面要说的RecurringJobScheduler循环任务类也实现了这个接口。到此我们的RecurringJobScheduler循环定时任务线程就算开始执行了。
RecurringJobScheduler循环定时任务机制
照旧看下这个类型的定义
 1 public class RecurringJobScheduler : IBackgroundProcess
 2     {
 3         // 其他代码
 4         public RecurringJobScheduler(
 5             [NotNull] IBackgroundJobFactory factory,
 6             TimeSpan pollingDelay,
 7             [NotNull] ITimeZoneResolver timeZoneResolver,
 8             [NotNull] Func<DateTime> nowFactory)
 9         {
10             if (factory == null) throw new ArgumentNullException(nameof(factory));
11             if (nowFactory == null) throw new ArgumentNullException(nameof(nowFactory));
12             if (timeZoneResolver == null) throw new ArgumentNullException(nameof(timeZoneResolver));
13  
14  
15             _factory = factory;
16             _nowFactory = nowFactory;
17             _timeZoneResolver = timeZoneResolver;
18             _pollingDelay = pollingDelay;
19             _profiler = new SlowLogProfiler(_logger);
20         }
21  
22  
23         /// <inheritdoc />
24         public void Execute(BackgroundProcessContext context) // 实现方法
25         {
26             if (context == null) throw new ArgumentNullException(nameof(context));
27  
28  
29             var jobsEnqueued = 0;
30  
31  
32             while (EnqueueNextRecurringJobs(context)) // 从数据库获取定时任务
33             {
34                 jobsEnqueued++;
35  
36  
37                 if (context.IsStopping)
38                 {
39                     break;
40                 }
41             }
42  
43  
44             if (jobsEnqueued != 0)
45             {
46                 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued.");
47             }
48  
49  
50             if (_pollingDelay > TimeSpan.Zero)
51             {
52                 context.Wait(_pollingDelay);
53             }
54             else
55             {
56                 var now = _nowFactory();
57                 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) - now);
58             }
59         }
60     }

 

承上,调用元组的第一个参数的execute方法,RecurringJobScheduler的execute方法得以执行,该方法就干一件事,每隔15秒从数据库获取待执行的计划,每次1000条数据。通过EnqueueNextRecurringJobs方法获取任务。
 1 private bool EnqueueNextRecurringJobs(BackgroundProcessContext context)
 2         {
 3             return UseConnectionDistributedLock(context.Storage, connection => 
 4             {
 5                 var result = false;
 6                 if (IsBatchingAvailable(connection))
 7                 {
 8                     var now = _nowFactory();
 9                     var timestamp = JobHelper.ToTimestamp(now);
10                     var recurringJobIds =  ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0,  timestamp, BatchSize); // 从数据库里面查询
11                     if (recurringJobIds == null || recurringJobIds.Count == 0) return  false;
12                     foreach (var recurringJobId in recurringJobIds)
13                     {
14                         if (context.IsStopping) return false;
15                         if (TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))// 排队执行
16                         {
17                             result = true;
18                         }
19                     }
20                 }
21                 else
22                 {
23                     for (var i = 0; i < BatchSize; i++)
24                     {
25                         if (context.IsStopping) return false;
26                         var now = _nowFactory();
27                         var timestamp = JobHelper.ToTimestamp(now);
28                         var recurringJobId =  connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp);
29                         if (recurringJobId == null) return false;
30                         if (!TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))
31                         {
32                             return false;
33                         }
34                     }
35                 }
36                 return result;
37             });
38         }

 

GetFirstByLowestScoreFromSet方法从数据库Set表里面查询top1000数据,条件是key为recurring-jobs字符串(表示定时任务)并且 时间范围是0到当前时间。随后遍历这些jobids,排队执行,往下看TryEnqueueBackgroundJob方法的实现。
 1 private bool EnqueueBackgroundJob(
 2             BackgroundProcessContext context,
 3             IStorageConnection connection,
 4             string recurringJobId,
 5             DateTime now)
 6         {
 7             // 其他代码
 8             using (connection.AcquireDistributedRecurringJobLock(recurringJobId,  LockTimeout))
 9             {
10                 try
11                 {
12                     var recurringJob = connection.GetRecurringJob(recurringJobId,  _timeZoneResolver, now);
13                     if (recurringJob == null)
14                     {
15                         using (var transaction = connection.CreateWriteTransaction())
16                         {
17                             transaction.RemoveFromSet("recurring-jobs", recurringJobId);
18                             transaction.Commit();
19                         }
20                         return false;
21                     }
22           
23                     BackgroundJob backgroundJob = null;
24                     IReadOnlyDictionary<string, string> changedFields;
25                     if (recurringJob.TrySchedule(out var nextExecution, out var error))
26                     {
27                         if (nextExecution.HasValue && nextExecution <= now)
28                         {
29                             backgroundJob = _factory.TriggerRecurringJob(context.Storage,  connection, _profiler, recurringJob, now);
30                             if (String.IsNullOrEmpty(backgroundJob?.Id))
31                             {
32                                 _logger.Debug($"Recurring job ‘{recurringJobId}‘ execution  at ‘{nextExecution}‘ has been canceled.");
33                             }
34                         }
35                         recurringJob.IsChanged(out changedFields, out nextExecution);
36                     }
37                     else if (recurringJob.RetryAttempt < MaxRetryAttemptCount)
38                     {
39                         var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay :  TimeSpan.FromMinutes(1);
40                         
41                         _logger.WarnException($"Recurring job ‘{recurringJobId}‘ can‘t be  scheduled due to an error and will be retried in {delay}.", error);
42                         recurringJob.ScheduleRetry(delay, out changedFields, out  nextExecution);
43                     }
44                     else
45                     {
46                         _logger.ErrorException($"Recurring job ‘{recurringJobId}‘ can‘t be  scheduled due to an error and will be disabled.", error);
47                         recurringJob.Disable(error, out changedFields, out nextExecution);
48                     }
49               
50                     using (var transaction = connection.CreateWriteTransaction())
51                     {
52                         if (backgroundJob != null)
53                         {
54                             _factory.StateMachine.EnqueueBackgroundJob(
55                                 context.Storage,
56                                 connection,
57                                 transaction,
58                                 recurringJob,
59                                 backgroundJob,
60                                 "Triggered by recurring job scheduler",
61                                 _profiler);
62                         }
63                         transaction.UpdateRecurringJob(recurringJob, changedFields,  nextExecution, _logger);
64                         transaction.Commit();
65                         return true;
66                     }
67                 }
68                 catch (TimeZoneNotFoundException ex)
69                 {
70                 catch (Exception ex)
71                 {
72    
73                 }
74                 return false;
75             }
76         }

 

需要注意的地方我都有加粗,该方法大概流程是:1.GetRecurringJob根据jobid从Hash表里面查询一条完整的定时任务,2.TrySchedule获取该任务的下次执行时间,如果下次执行时间小于当前,执行这条任务(并非真正执行定时任务,只是往job表里面写数据,真正执行任务由worker完成),3.获取下次执行时间&所有任务字段,4.状态机修改任务状态。定时任务就这样周而复始的重复执行以上流程。这里简单说下worker的执行机制,其实际就是轮询检索job表里面的数据执行任务表达式树,worker在hangfire里面默认开启了20个线程。第三部分就到这吧。
 
避坑
简单说下个人在改bug期间遇到的一些问题啊。
1.时区问题,在添加定时任务时如果不指定时区信息,默认使用的是utc时间,我们中国是东8区,也就是说解析出来的执行时间会晚8个小时执行。解决办法有几种可以通过全局指定options的ITimeZoneResolver属性指定,也可以通过AddorUpdate方法指定,如果是指定时区信息,需要注意看板上面的异常信息,如果有异常会导致任务不执行,时区信息它是从系统里面检索出来的,没有就抛异常。就这样吧。
 
 
 
 
 
 

说下hangfire吧

标签:日志   Once   param   any   除了   dstat   das   throw   queue   

原文地址:https://www.cnblogs.com/adair-blog/p/12490042.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!