码迷,mamicode.com
首页 > 其他好文 > 详细

并发任务管理器AsyncTaskManager

时间:2017-08-13 01:09:17      阅读:176      评论:0      收藏:0      [点我收藏+]

标签:new   read   collect   creat   generic   []   while   ons   enum   

//-------------------------------------------------------------------------- 
//  
//  Copyright (c) BUSHUOSX.  All rights reserved.  
//  
//  File: AsyncTaskManager.cs 
//
//  Version:1.0.0.0
//
//  Datetime:20170812
// 
//-------------------------------------------------------------------------- 


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace BUSHUOSX
{
    class AsyncTaskManager
    {
        /// <summary>
        /// 缓存的任务队列
        /// </summary>
        readonly Queue<Task> _taskQueue = new Queue<Task>();

        /// <summary>
        /// 工作锁,保护_taskQueue
        /// </summary>
        SpinLock _workLock;

        ///// <summary>
        ///// 入队锁
        ///// </summary>
        //SpinLock _taskEnQueueLock;
        ///// <summary>
        ///// 出队锁
        ///// </summary>
        //SpinLock _taskDeQueueLock;

        /// <summary>
        /// 工作信号,与MaxConcurrencyLevel控制并行量
        /// </summary>
        SemaphoreSlim _workSemaphore;

        /// <summary>
        /// 工作线程取消标志
        /// </summary>
        CancellationTokenSource ctsCancel;
        /// <summary>
        /// 工作线程
        /// </summary>
        Task _worker;

        /// <summary>
        /// 工作器状态
        /// </summary>
        private bool IsWorking { get; set; }

        /// <summary>
        /// 任务最大并发量
        /// </summary>
        public int MaxConcurrencyLevel { get; }

        /// <summary>
        /// 内部工作器将在队列中有任务时自动启动。否则由Start方法启动。
        /// </summary>
        public bool AutoRunWorker { get; }

        /// <summary>
        /// 工作器每一次清空队列任务,都调用
        /// </summary>
        private Action<Task> _callbackOnAllTaskComplited;

        /// <summary>
        /// 队列中的任务任务完成时,都将调用
        /// </summary>
        private Action<Task> _callbackOnAnyTaskComplited;

        /// <summary>
        /// 控制异步任务的并发量。
        /// 注意:只能严格控制stauts为Created的任务
        /// </summary>
        /// <param name="maxConcurrencyLevel">最大并发数</param>
        /// <param name="callbackOnAnyTaskComplited">如果不为null,则队列中的任何任务完成时都将传递给此回调</param>
        /// <param name="callbackOnAllTaskComplited">如果不为null,则内部队列为空时传递工作器给此回调</param>
        /// <param name="autoRunWorker">指示内部工作器将在队列中有任务时自动启动,还是由Start方法启动。</param>
        public AsyncTaskManager(int maxConcurrencyLevel, Action<Task> callbackOnAnyTaskComplited = null, Action<Task> callbackOnAllTaskComplited = null, bool autoRunWorker = true)
        {
            _callbackOnAnyTaskComplited = callbackOnAnyTaskComplited;
            _callbackOnAllTaskComplited = callbackOnAllTaskComplited;
            AutoRunWorker = autoRunWorker;
            MaxConcurrencyLevel = maxConcurrencyLevel < 0 ? int.MaxValue : maxConcurrencyLevel;
        }

        /// <summary>
        /// 排入一个任务到内部队列,该队列中的任务将被依次调用。
        /// </summary>
        /// <param name="task">要排队的任务。注意:只能严格控制stauts为Created的任务</param>
        /// <param name="callbackOnTaskComplited">此任务完成时回调</param>
        public void QueueTask(Task task, Action<Task> callbackOnTaskComplited = null)
        {
            if (task == null) return;
            if (null == callbackOnTaskComplited)
            {
                EnqueueTask(task);
            }
            else
            {
                EnqueueTask(task.ContinueWith(callbackOnTaskComplited));
            }
            if (AutoRunWorker)
            {
                notifyStartWork();
            }
        }

        //public void QueueTask(IEnumerable<Task> tasks, Action<Task> callbackOnTaskComplited = null)
        //{
        //    foreach (var item in tasks)
        //    {
        //        if (item == null) break;
        //        if (null == callbackOnTaskComplited)
        //        {
        //            EnqueueTask(item);
        //        }
        //        else
        //        {
        //            EnqueueTask(item.ContinueWith(callbackOnTaskComplited));
        //        }
        //    }
        //    if (AutoRunWorker)
        //    {
        //        notifyStartWork();
        //    }
        //}

        /// <summary>
        /// 返回此刻队列中的任务。
        /// </summary>
        /// <returns></returns>
        public Task[] GetQueueTask()
        {
            bool gotlock = false;
            try
            {
                _workLock.Enter(ref gotlock);
                if (_taskQueue.Count > 0)
                {
                    return _taskQueue.ToArray();
                }
                else
                {
                    return null;
                }
            }
            finally
            {
                _workLock.Exit();
            }
        }

        /// <summary>
        /// 启动内部工作器。
        /// 注意:为降低资源占用,该工作器在内部队列为空时会自动退出。
        /// </summary>
        public void Start()
        {
            notifyStartWork();
        }

        /// <summary>
        /// 挂起队列中剩余的任务。稍后可以使用Continue方法继续。
        /// </summary>
        public void Suspend()
        {
            stopWorkThread(false);
        }

        /// <summary>
        /// 停止工作器,并清空内部任务队列还未调用的任务。
        /// 已调用的任务还将继续运行。
        /// </summary>
        public void Cancel()
        {
            stopWorkThread(true);
        }


        private void stopWorkThread(bool clearTasks)
        {
            if (IsWorking)
            {
                ctsCancel.Cancel();
                if (clearTasks)
                {
                    bool gotlock = false;
                    try
                    {
                        _workLock.Enter(ref gotlock);
                        _taskQueue.Clear();
                    }
                    finally
                    {
                        if (gotlock)
                        {
                            _workLock.Exit();
                        }
                    }
                }
            }
        }

        /// <summary>
        /// 继续之前挂起的任务。
        /// </summary>
        public void Continue()
        {
            notifyStartWork();
        }


        /// <summary>
        /// 内部启动工作器
        /// </summary>
        private void notifyStartWork()
        {
            if (IsWorking) return;

            //初始化
            ctsCancel = new CancellationTokenSource();
            //_taskDeQueueLock = new SpinLock();
            //_taskEnQueueLock = new SpinLock();
            _workLock = new SpinLock();
            _workSemaphore = new SemaphoreSlim(MaxConcurrencyLevel, MaxConcurrencyLevel);

            //_worker = Task.Run(new Action(workerThread), ctsStop.Token);
            _worker = Task.Run(new Action(workerThread));
            _worker.ContinueWith((t) => { notifyEndWork(); });

            IsWorking = true;
        }

        /// <summary>
        /// 工作器结束时调用
        /// </summary>
        private void notifyEndWork()
        {
            if (IsWorking)
            {
                //ctsCancel = null;

                _callbackOnAllTaskComplited?.Invoke(_worker);

                IsWorking = false;
                Debug.WriteLine("工作线程结束……");
            }
        }

        /// <summary>
        /// 任务完成时调用
        /// </summary>
        /// <param name="task"></param>
        private void anyTaskComplited(Task task)
        {
            _workSemaphore.Release();
            //todo task
            _callbackOnAnyTaskComplited?.Invoke(task);
            //Debug.WriteLine("完成任务{0}:{1}", task.Id, task.Status.ToString());
        }

        /// <summary>
        /// 工作器线程执行方法。只应存在一个。
        /// </summary>
        private void workerThread()
        {
            Debug.WriteLine("工作线程启动……");

            Task tmp = null;
            while (true)
            {
                try
                {
                    _workSemaphore.Wait(ctsCancel.Token);
                }
                catch (OperationCanceledException e)
                {
                    break;
                }

                tmp = DequeueTask();
                if (tmp != null)
                {
                    if (tmp.Status == TaskStatus.Created)
                    {
                        tmp.Start();
                    }
                    tmp.ContinueWith(anyTaskComplited);
                }
                else
                {
                    Debug.WriteLine("workerAsync:null taskQueue");
                    break;
                }
            }
        }

        /// <summary>
        /// 排入任务,期望线程安全
        /// </summary>
        /// <param name="task"></param>
        private void EnqueueTask(Task task)
        {
            bool gotlock = false;
            try
            {
                _workLock.Enter(ref gotlock);
                _taskQueue.Enqueue(task);
            }
            finally
            {
                if (gotlock) _workLock.Exit();
            }

        }

        /// <summary>
        /// 弹出任务,期望线程安全
        /// </summary>
        /// <returns></returns>
        private Task DequeueTask()
        {
            bool gotlock = false;
            try
            {
                _workLock.Enter(ref gotlock);
                if (_taskQueue.Count > 0)
                {
                    return _taskQueue.Dequeue();
                }
                else
                {
                    return null;
                }
            }
            finally
            {
                if (gotlock) _workLock.Exit();
            }
        }
    }
}

 

并发任务管理器AsyncTaskManager

标签:new   read   collect   creat   generic   []   while   ons   enum   

原文地址:http://www.cnblogs.com/bushuosx/p/7352131.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!