码迷,mamicode.com
首页 > 其他好文 > 详细

接上文,可设置并发数的版本

时间:2014-09-15 05:34:28      阅读:137      评论:0      收藏:0      [点我收藏+]

标签:style   blog   color   io   os   ar   for   art   div   

做了些微优化,并增加并发数控制:

    public abstract class MessageQueueConcurrentHandlerBase<T> : IMessageQueueHandler
    {
        public MessageQueueConcurrentHandlerBase(string queueName, int maxConcurrency, Action<string> logDelegate)
        {
            if (!MessageQueue.Exists(queueName))
                throw new Exception(string.Format("No such a queue: {0}", queueName));
            if (maxConcurrency < 1)
                throw new ArgumentOutOfRangeException("maxConcurrency");

            this._queueName = queueName;
            this._poolForConsumer = new Semaphore(0, maxConcurrency);
            this._producerAutoResetEvent = new AutoResetEvent(false);
            this._maxConcurrency = maxConcurrency;
            this._logDelegate = logDelegate;
        }

        public void StartRead()
        {
            this._queue = new MessageQueue(this._queueName) { Formatter = new XmlMessageFormatter(new Type[] { typeof(long) }) };
            this._queue.PeekCompleted += new PeekCompletedEventHandler(Produce);
            this._producerAutoResetEvent.Set();
            this._poolForConsumer.Release(this._maxConcurrency);

            this._queue.BeginPeek();
        }

        public override string ToString()
        {
            return string.Format("{0}_{1}", this._queueName, this.ProcessName);
        }

        public int WorkerCount { get { return Thread.VolatileRead(ref this._workerCount); } }

        public int MaxConcurrency { get { return _maxConcurrency; } }

        protected abstract string ProcessName { get; }

        protected abstract void MainProcess(T backThreadId);

        protected Action<string> LogDelegate { get { return _logDelegate; } }

        #region private
        private void Produce(object sender, PeekCompletedEventArgs e)
        {
            this._producerAutoResetEvent.WaitOne();

            var message = this._queue.EndPeek(e.AsyncResult);

            ThreadPool.QueueUserWorkItem(new WaitCallback(this.Consume));
            this._queue.BeginPeek();
        }

        private void Consume(object stateInfo)
        {
            this._poolForConsumer.WaitOne();

            var message = this._queue.Receive();
            this._producerAutoResetEvent.Set();

            T messageItem = (T)message.Body;

            this.LogDelegate(string.Format("{0} - Received a message, MessageItem = {1}", this.ProcessName, messageItem));
            Interlocked.Increment(ref this._workerCount);

            try
            {
                this.LogDelegate(string.Format("{0} - Running - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this._workerCount));
                MainProcess(messageItem);
            }
            catch (Exception ex)
            {
                this.HandleException(ex, messageItem);
            }
            finally
            {
                Interlocked.Decrement(ref this._workerCount);

                this.LogDelegate(string.Format("{0} - Over - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this._workerCount));
            }

            this._poolForConsumer.Release();
        }

        private void HandleException(Exception ex, T messageItem)
        {
            this.LogDelegate(string.Format("Exception in {0}:[Message]={1},[StackTrace]={2},[Type]={3},[_workerCount]={4},[backThreadId]={5}", this.ProcessName, ex.Message, ex.StackTrace, ex.GetType(), this.WorkerCount, messageItem));
        }

        private readonly string _queueName;
        private MessageQueue _queue;
        private int _workerCount;
        private Semaphore _poolForConsumer;
        private AutoResetEvent _producerAutoResetEvent;
        private int _maxConcurrency;
        private Action<string> _logDelegate;
        #endregion
    }

觉得好就表扬一下啊:)

接上文,可设置并发数的版本

标签:style   blog   color   io   os   ar   for   art   div   

原文地址:http://www.cnblogs.com/bighuiwolf/p/3972091.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!