码迷,mamicode.com
首页 > Web开发 > 详细

ASP.NET中进行消息处理(MSMQ) 三

时间:2014-10-17 01:02:23      阅读:300      评论:0      收藏:0      [点我收藏+]

标签:style   blog   http   io   os   使用   ar   for   strong   

在本文的前两篇文章里对MSMQ的相关知识点进行了介绍,很多阅读过这前两篇文章的朋友都曾问到过这样一些问题:
  1、如何把MSMQ应用到实际的项目中去呢?
  2、可不可以介绍一个实际的应用实例?
  3、......

      在前两篇文章里,关于MSMQ常用的技术点基本介绍完毕了,本文主要以MS开源项目PetShop中的MSMQ应用作为案例来介绍MSMQ在实际项目中的应用。在PetShop里,由于系统使用了多线程的专用应用程序来监控消息队列,在进入PetShop应用分析前,我们先来了解下关于多线程和MSMQ的相关知识点。

一、多线程和MSMQ
      现在有这样一个需求,指定的消息队列里不管有无消息数据,我们通过一个多线程来监控这个队列,一但队列里的数据发生变化就做出相应的处理,比如把消息读取出来。根据这个需求,我们来做个示例,用一多线程把队列监控起来,如果队列里有消息数据,就把消息读取出来,如果没有则一直监视队列,当队列数据发生改变(有新的消息加入)的时候就作出处理(读取消息)。

      首先定义一个线程数组用于存储线程数;

1bubuko.com,布布扣static private int ThreadNumber = 5;  //5个线程序
2bubuko.com,布布扣static private Thread[] ThreadArray = new Thread[ThreadNumber];

     我们把需要启动的线程装载入ThreadArray数组,通过一个遍历数组把所以的线程启动,实际这里只有5个线程。

bubuko.com,布布扣
 1bubuko.com,布布扣private void button1_Click(object sender, EventArgs e)
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    StartThreads();
 4bubuko.com,布布扣}
 5bubuko.com,布布扣
 6bubuko.com,布布扣private void StartThreads()
 7bubuko.com,布布扣{
 8bubuko.com,布布扣    int counter; //线程计数
 9bubuko.com,布布扣    for (counter = 0; counter < ThreadNumber; counter++)
10bubuko.com,布布扣    {
11bubuko.com,布布扣        ThreadArray[counter] = new Thread(new ThreadStart(MSMQListen));
12bubuko.com,布布扣        ThreadArray[counter].Start();
13bubuko.com,布布扣        this.richTextBox2.Text += (counter + 1).ToString() + "号线程开始!";
14bubuko.com,布布扣    }
15bubuko.com,布布扣}
16bubuko.com,布布扣
17bubuko.com,布布扣private void MSMQListen()
18bubuko.com,布布扣{
19bubuko.com,布布扣    while (true)
20bubuko.com,布布扣    {
21bubuko.com,布布扣        //取出队列里的消息
22bubuko.com,布布扣        MessageBox.Show(MsgQueue.ReceiveMessage());
23bubuko.com,布布扣    }
24bubuko.com,布布扣}
bubuko.com,布布扣

      如果上面这段代码阅读起存在问题,建议先去了解下多线程的相关知识点。在StartThreads方法里启动数组里存储的所以线程,并委托给MSMQListen方法进行处理,MSMQListen方法完成的就是读取队列里的消息,这里我使用了在第二篇文章里所使用的MsgQueue类和Book类,详细请阅读第二篇文章ASP.NET中进行消息处理(MSMQ) 二 。
                              bubuko.com,布布扣
      启动了5个线程,用来监视指定的消息队列,如上图。那好,我们现在就来测试一下,通过给队列里发送消息,看线程是否会有响应。从上面启动线程的代码上可以很清晰的看出,只要队列里有消息在多线程的监视下,线程就会把队列里的消息读取出来。

bubuko.com,布布扣
 1bubuko.com,布布扣private void button3_Click(object sender, EventArgs e)
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    Book book = new Book();
 4bubuko.com,布布扣    book.BookId = 1;
 5bubuko.com,布布扣    book.BookName = "asp.net";
 6bubuko.com,布布扣    book.BookAuthor = "abcd";
 7bubuko.com,布布扣    book.BookPrice = 50.80;
 8bubuko.com,布布扣
 9bubuko.com,布布扣    MsgQueue.SendMessage(book);
10bubuko.com,布布扣}
bubuko.com,布布扣

      那么这里的测试,向队列里发送了一Book对象消息,根据上面分析,则多线程便会同时把此条消息读取出来: 
                            bubuko.com,布布扣
     呵呵,测试结果出来,看来此测试达到了我们之前所提出的需求。bubuko.com,布布扣!OK,关于MSMQ和多线程就简单介绍于此。

二、MSMQ在开塬项目PetShop中的应用分析。
      在PetShop 4.0中,利用消息队列临时存放要插入的数据,来避免因为频繁访问数据库的操作。而队列中的消息,则等待系统的专用的应用程序来处理,最后将数据插入到数据库中。
      PetShop 4.0中的消息处理,主要分为下面几大部分:订单策略接口IOrderStategy、消息接口IMessageing、消息工厂MessageFactory、MSMQ实现MSMQMessaging、后台处理应用程序OrderProessor。如下图:
                                            bubuko.com,布布扣
   
1、订单策略接口IOrderStategy
     PetShop 4.0的体系结构是非常庞大,在订单处理上有两种处理策略,这里也是策略模式的一个应用,IOrderStrategy接口作为订单策略的高层抽象,实现不同订单处理的具体策略去实现它,UML如下: 
                             bubuko.com,布布扣

示意性代码:

bubuko.com,布布扣
 1bubuko.com,布布扣namespace PetShop.IBLLStrategy
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    public interface IOrderStrategy
 4bubuko.com,布布扣    {
 5bubuko.com,布布扣        void Insert(OrderInfo order);
 6bubuko.com,布布扣    }
 7bubuko.com,布布扣}
 8bubuko.com,布布扣
 9bubuko.com,布布扣namespace PetShop.BLL
10bubuko.com,布布扣{
11bubuko.com,布布扣    public class OrderSynchronous:IOrderStrategy
12bubuko.com,布布扣    {
13bubuko.com,布布扣        private static readonly IOrder asynchOrder = QueueAccess.CreateOrder();
14bubuko.com,布布扣
15bubuko.com,布布扣        public void Insert(OrderInfo order)
16bubuko.com,布布扣        {
17bubuko.com,布布扣            asynchOrder.Send(order);
18bubuko.com,布布扣        }
19bubuko.com,布布扣    }
20bubuko.com,布布扣}
21bubuko.com,布布扣
22bubuko.com,布布扣//bubuko.com,布布扣bubuko.com,布布扣
bubuko.com,布布扣

 

      从上面UML和代码就可以看出,订单策略接口下有两种实现,使用了抽象工厂模式来完成相应的订单策略对象的创建 。关于这点在后面消息工厂部分去介绍,这里不作讲解。

2、消息接口IMessageing
     在PetShop 4.0中,由于对订单处理使用了异步处理方式,在消息接口中仅定义了一个IOrder接口。IOrder接口的定义与MSMQ的实现是一致的,需要提供发送和接收操作。在Send方法中,参数为数据访问层的数据实体对象(OrderInfo),具体的实现则是用MSMQ的实现类(PetShop.MSMQMessaging.Order)去完成的。
             bubuko.com,布布扣           bubuko.com,布布扣
      MS的开发人员真的是什么都能想到,在消息接口的实现上考虑得很全面,为了避免将来的扩展会有其他的数据对象也使用到MSMQ;因此,在PetShop 4.0中的消息接口实现中,定义了一个队列的基类(PetShopQueue),实现了消息的发送(Send)和接收(Receive)方法的基本操作。代码如下:

bubuko.com,布布扣
 1bubuko.com,布布扣namespace PetShop.MSMQMessaging
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    public class PetShopQueue:IDisposable
 4bubuko.com,布布扣    {
 5bubuko.com,布布扣        //指定消息队列事务的类型
 6bubuko.com,布布扣        protected MessageQueueTransactionType transactionType = MessageQueueTransactionType.Automatic;
 7bubuko.com,布布扣        protected MessageQueue queue;  //消息队列
 8bubuko.com,布布扣        protected TimeSpan timeout;    //时间间隔
 9bubuko.com,布布扣
10bubuko.com,布布扣        public PetShopQueue(string queuePath, int timeoutSeconds)
11bubuko.com,布布扣        {
12bubuko.com,布布扣            queue = new MessageQueue(queuePath);  //根据传入quueuPath创建队列
13bubuko.com,布布扣            timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds));
14bubuko.com,布布扣
15bubuko.com,布布扣            queue.DefaultPropertiesToSend.AttachSenderId = false;
16bubuko.com,布布扣            queue.DefaultPropertiesToSend.UseAuthentication = false;
17bubuko.com,布布扣            queue.DefaultPropertiesToSend.UseEncryption = false;
18bubuko.com,布布扣            queue.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.None;
19bubuko.com,布布扣            queue.DefaultPropertiesToSend.UseJournalQueue = false;
20bubuko.com,布布扣        }
21bubuko.com,布布扣
22bubuko.com,布布扣        /// <summary>
23bubuko.com,布布扣        /// 接收消息方法
24bubuko.com,布布扣        /// </summary>
25bubuko.com,布布扣        public virtual object Receive()
26bubuko.com,布布扣        {
27bubuko.com,布布扣            try
28bubuko.com,布布扣            {
29bubuko.com,布布扣                using (Message message = queue.Receive(timeout, transactionType))
30bubuko.com,布布扣                    return message;
31bubuko.com,布布扣            }
32bubuko.com,布布扣            catch (MessageQueueException mqex)
33bubuko.com,布布扣            {
34bubuko.com,布布扣                if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
35bubuko.com,布布扣                    throw new TimeoutException();
36bubuko.com,布布扣                throw;
37bubuko.com,布布扣            }
38bubuko.com,布布扣        }
39bubuko.com,布布扣
40bubuko.com,布布扣        /// <summary>
41bubuko.com,布布扣        /// 发送消息
42bubuko.com,布布扣        /// </summary>
43bubuko.com,布布扣        public virtual void Send(object msg)
44bubuko.com,布布扣        {
45bubuko.com,布布扣            queue.Send(msg, transactionType);
46bubuko.com,布布扣        }
47bubuko.com,布布扣
48bubuko.com,布布扣        #region IDisposable 成员
49bubuko.com,布布扣        public void Dispose()
50bubuko.com,布布扣        {
51bubuko.com,布布扣            queue.Dispose();  //解放资源
52bubuko.com,布布扣        }
53bubuko.com,布布扣        #endregion
54bubuko.com,布布扣    }
55bubuko.com,布布扣}
bubuko.com,布布扣

     MSMQ队列是一个可持久的队列,不会因用户不间断的下订单导致数据丢失。queue作为存放数据的队列,为消息队列(MessageQueue)类型,同时还为PetShopQueue设置了timeout值,后台处理应用程序(OrderProessor)会根据timeout的值定期扫描队列中的订单数量。

3、消息工厂MessageFactory
     可能是考虑到IOrder的实现会改变不同的策略吧,在PetShop里利用了抽象工厂模式,将IOrder对象的创建用了专门的工厂模块(MessageFactory)进行封装,定义如下:

bubuko.com,布布扣
 1bubuko.com,布布扣namespace PetShop.MessagingFactory
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    /// <summary>
 4bubuko.com,布布扣    /// This class is implemented following the Abstract Factory pattern to create the Order
 5bubuko.com,布布扣    /// Messaging implementation specified from the configuration file
 6bubuko.com,布布扣    /// </summary>
 7bubuko.com,布布扣    public sealed class QueueAccess
 8bubuko.com,布布扣    {
 9bubuko.com,布布扣        //<add key="OrderMessaging" value="PetShop.MSMQMessaging"/>
10bubuko.com,布布扣        private static readonly string path = "PetShop.MSMQMessaging";
11bubuko.com,布布扣
12bubuko.com,布布扣        /// <summary>
13bubuko.com,布布扣        /// 私有构造器,防止使用new创建对象实例
14bubuko.com,布布扣        /// </summary>
15bubuko.com,布布扣        private QueueAccess()
16bubuko.com,布布扣        { }
17bubuko.com,布布扣
18bubuko.com,布布扣        public static IOrder CreateOrder()
19bubuko.com,布布扣        {
20bubuko.com,布布扣            string className = path + ".Order";
21bubuko.com,布布扣            return (IOrder)Assembly.Load(path).CreateInstance(className);
22bubuko.com,布布扣        }
23bubuko.com,布布扣    }
24bubuko.com,布布扣}
bubuko.com,布布扣

     在QueueAccess类中,通过CreateOrder方法利用反射技术创建正确IOrder类型对象(实际也就是创建了一个接口的具体实现类的对象,应用了多态的原理和反射技术)。UML图下: 
                         bubuko.com,布布扣
     在PetShop4.0中,消息接口的具体实现是通过配置文件定义在web.config里:

bubuko.com,布布扣<add key="OrderMessaging" value="PetShop.MSMQMessaging"/>

     这里我为了能够更直观的演示和介绍就把path固化定义了,如下:

bubuko.com,布布扣private static readonly string path = "PetShop.MSMQMessaging";

     这里利用工厂模式来负责对象的创建,主要是方便业务逻辑层对定单处理策略的调用,如在PetShop.BLL模块中的OrderSynchronous类:

bubuko.com,布布扣
 1bubuko.com,布布扣namespace PetShop.BLL
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    public class OrderSynchronous:IOrderStrategy
 4bubuko.com,布布扣    {
 5bubuko.com,布布扣        private static readonly IOrder asynchOrder = QueueAccess.CreateOrder();
 6bubuko.com,布布扣
 7bubuko.com,布布扣        public void Insert(OrderInfo order)
 8bubuko.com,布布扣        {
 9bubuko.com,布布扣            asynchOrder.Send(order);
10bubuko.com,布布扣        }
11bubuko.com,布布扣    }
12bubuko.com,布布扣}
bubuko.com,布布扣

这样一但IOrder接口的实现发生了变化,此时就只需要修改配置文件就OK,整个系统就显得很灵活,稳定。

4、MSMQ实现MSMQMessaging
    在PetShop.MSMQMessaging模块中,订单对象实现了消息接口(IMessaging)模块中的IOrder,同时还继承了基类PetShopQueue。定义如下:

bubuko.com,布布扣
 1bubuko.com,布布扣namespace PetShop.MSMQMessaging
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    public class Order:PetShopQueue,IOrder
 4bubuko.com,布布扣    {
 5bubuko.com,布布扣        private static readonly string queuePath = ConfigurationManager.AppSettings["OrderQueuePath"];
 6bubuko.com,布布扣        private static int queueTimeout = 20;  //20秒为超时
 7bubuko.com,布布扣
 8bubuko.com,布布扣        public Order()
 9bubuko.com,布布扣            : base(queuePath, queueTimeout)
10bubuko.com,布布扣        {
11bubuko.com,布布扣            queue.Formatter = new BinaryMessageFormatter();
12bubuko.com,布布扣            Console.WriteLine(queuePath);
13bubuko.com,布布扣        }
14bubuko.com,布布扣
15bubuko.com,布布扣        public new OrderInfo Receive()
16bubuko.com,布布扣        {
17bubuko.com,布布扣            //该方法会应用在分布式事务中,故而设置为Automatic的事务类型。
18bubuko.com,布布扣            base.transactionType = MessageQueueTransactionType.Automatic;
19bubuko.com,布布扣            return (OrderInfo)((Message)base.Receive()).Body;
20bubuko.com,布布扣        }
21bubuko.com,布布扣
22bubuko.com,布布扣        public OrderInfo Receive(int timeout)
23bubuko.com,布布扣        {
24bubuko.com,布布扣            base.timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeout));
25bubuko.com,布布扣            return Receive();
26bubuko.com,布布扣        }
27bubuko.com,布布扣
28bubuko.com,布布扣        /// <summary>
29bubuko.com,布布扣        /// 异步发送定单到消息队列
30bubuko.com,布布扣        /// </summary>
31bubuko.com,布布扣        /// <param name="orderMessage"></param>
32bubuko.com,布布扣        public void Send(OrderInfo orderMessage)
33bubuko.com,布布扣        {
34bubuko.com,布布扣            //该方法不会用于分布式事务中,故而设置为Single的事务类型。
35bubuko.com,布布扣            base.transactionType = MessageQueueTransactionType.Single;
36bubuko.com,布布扣            base.Send(orderMessage);
37bubuko.com,布布扣        }
38bubuko.com,布布扣    }
39bubuko.com,布布扣}
bubuko.com,布布扣

UML草图:
                                      bubuko.com,布布扣
     这里需要注意的是,Order类既继承了基类PetShopQueue,同时还实现了接口IOrder,而在消息接口和基类中所定义的接收消息(Receive)方法在方法的签名上是相同的。所以在Order的Receive方法实现中,必须使用new而非override关键字来重写父类PetShopQueue的Receive虚方法。此时Order类的Receive方法代表两个含义,一是实现了消息接口IOrder中的Receive方法;二则是利用了new关键字重写了父类PetShopQueue的Receive虚方法。
     在PetShop4.0中,将面向对象的知识点应用得非常精妙,如上分析,此时我门可以怎么来调用Order呢?是这样吗?

bubuko.com,布布扣
1bubuko.com,布布扣//1、使用基类PetShopQueue
2bubuko.com,布布扣PetShopQueue order = new Order();
3bubuko.com,布布扣order.Receive();
4bubuko.com,布布扣
5bubuko.com,布布扣//2、使用消息接口IOrder
6bubuko.com,布布扣IOrder  order = new Order();
7bubuko.com,布布扣order.Receive();
bubuko.com,布布扣

     根据多态原理,上面这两种实现都是正确的,那我们那取谁舍谁呢?在PetShop4.0中正确的调用是第2种方法,这种调用方法也更符合“面向接口设计”的原则。----详细请查看消息工厂MessageFactory部分的介绍。

5、后台处理应用程序OrderProessor
      前面一系列的操作,最终都会走向到这里,这里实现了最终的插入数据库的操作。在PetShop 4.0中OrderProessor是一个控制台应用程序,根据需求也可以将其设计为Windows Service。他完成的操作就是接收消息队列里的订单数据,将其插入到数据库。在OrderProessor里使用了多线程技术,监视队列里的订单信息,定期的将其处理。在主方法Main方法中用于控制线程序,核心的执行任何则委托给ProcessOrders方法去实现。

bubuko.com,布布扣
 1bubuko.com,布布扣private static void ProcessOrders()
 2bubuko.com,布布扣{
 3bubuko.com,布布扣    TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize));
 4bubuko.com,布布扣    Order order = new Order();  //逻辑层的PetShop.BLL.Order
 5bubuko.com,布布扣    while (true)
 6bubuko.com,布布扣    {
 7bubuko.com,布布扣        TimeSpan datatimeStarting = new TimeSpan(DateTime.Now.Ticks);
 8bubuko.com,布布扣        double elapsedTime = 0;
 9bubuko.com,布布扣
10bubuko.com,布布扣        int processedItems = 0;
11bubuko.com,布布扣        ArrayList queueOrders = new ArrayList();
12bubuko.com,布布扣
13bubuko.com,布布扣        //首先验证事务
14bubuko.com,布布扣        using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout))
15bubuko.com,布布扣        {
16bubuko.com,布布扣            //从队列中检索订单
17bubuko.com,布布扣            for (int i = 0; i < batchSize; i++)
18bubuko.com,布布扣            {
19bubuko.com,布布扣                try
20bubuko.com,布布扣                {
21bubuko.com,布布扣                    //在一定时间 类接收队列的订单
22bubuko.com,布布扣                    if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds)
23bubuko.com,布布扣                    {
24bubuko.com,布布扣                        queueOrders.Add(order.ReceiveFromQueue(queueTimeout));
25bubuko.com,布布扣                    }
26bubuko.com,布布扣                    else
27bubuko.com,布布扣                    {
28bubuko.com,布布扣                        i = batchSize;  // 结束循环
29bubuko.com,布布扣                    }
30bubuko.com,布布扣                    elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datatimeStarting.TotalSeconds;
31bubuko.com,布布扣                }
32bubuko.com,布布扣                catch (TimeoutException)
33bubuko.com,布布扣                {
34bubuko.com,布布扣                    //没有可以等待的消息也结束循环
35bubuko.com,布布扣                    i = batchSize;
36bubuko.com,布布扣                }
37bubuko.com,布布扣            }
38bubuko.com,布布扣
39bubuko.com,布布扣            //处理队列的订单
40bubuko.com,布布扣            for (int k = 0; k < queueOrders.Count; k++)
41bubuko.com,布布扣            {
42bubuko.com,布布扣                order.Insert((OrderInfo)queueOrders[k]);
43bubuko.com,布布扣                processedItems++;
44bubuko.com,布布扣                totalOrdersProcessed++;
45bubuko.com,布布扣            }
46bubuko.com,布布扣            //处理完毕或者是超时
47bubuko.com,布布扣            ts.Complete();
48bubuko.com,布布扣        }
49bubuko.com,布布扣        Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds.");
50bubuko.com,布布扣    }
51bubuko.com,布布扣}
bubuko.com,布布扣

     ProcessOrders方法首先通过业务逻辑层PetShop.BLL.Order类的方法ReceiveFromQueue去获取消息队列中的订单数据,并将其放入一个ArrayList对象,然后将其插入到数据库。 OrderProcessor的完整代码定义如下:

bubuko.com,布布扣
  1bubuko.com,布布扣using System;
  2bubuko.com,布布扣using System.Collections.Generic;
  3bubuko.com,布布扣using System.Text;
  4bubuko.com,布布扣using System.Configuration;
  5bubuko.com,布布扣using System.Threading;
  6bubuko.com,布布扣using PetShop.BLL;
  7bubuko.com,布布扣using System.Collections;
  8bubuko.com,布布扣using System.Transactions;
  9bubuko.com,布布扣using PetShop.Model;
 10bubuko.com,布布扣
 11bubuko.com,布布扣namespace PetShop.OrderProcessor
 12bubuko.com,布布扣{
 13bubuko.com,布布扣    class Program
 14bubuko.com,布布扣    {
 15bubuko.com,布布扣        private static int transactionTimeout = int.Parse(ConfigurationManager.AppSettings["TransactionTimeout"]);
 16bubuko.com,布布扣        private static int queueTimeout = int.Parse(ConfigurationManager.AppSettings["QueueTimeout"]);
 17bubuko.com,布布扣        private static int batchSize = int.Parse(ConfigurationManager.AppSettings["BatchSize"]);
 18bubuko.com,布布扣        private static int threadCount = int.Parse(ConfigurationManager.AppSettings["ThreadCount"]);
 19bubuko.com,布布扣        private static int totalOrdersProcessed = 0;
 20bubuko.com,布布扣
 21bubuko.com,布布扣        static void Main(string[] args)
 22bubuko.com,布布扣        {
 23bubuko.com,布布扣            Thread workTicketThread;
 24bubuko.com,布布扣            Thread[] workerThreads = new Thread[threadCount];
 25bubuko.com,布布扣
 26bubuko.com,布布扣            for (int i = 0; i < threadCount; i++)
 27bubuko.com,布布扣            {
 28bubuko.com,布布扣                workTicketThread = new Thread(new ThreadStart(ProcessOrders));
 29bubuko.com,布布扣                //指示线呈是否为一后台线程
 30bubuko.com,布布扣                workTicketThread.IsBackground = true;
 31bubuko.com,布布扣                workTicketThread.SetApartmentState(ApartmentState.STA);
 32bubuko.com,布布扣
 33bubuko.com,布布扣                workTicketThread.Start();
 34bubuko.com,布布扣                workerThreads[i] = workTicketThread;
 35bubuko.com,布布扣            }
 36bubuko.com,布布扣
 37bubuko.com,布布扣            Console.WriteLine("开始处理,按任意键停止.");
 38bubuko.com,布布扣            Console.ReadLine();
 39bubuko.com,布布扣            Console.WriteLine("正在终止线程,请等待bubuko.com,布布扣bubuko.com,布布扣");
 40bubuko.com,布布扣
 41bubuko.com,布布扣            //终止所以线程
 42bubuko.com,布布扣            for (int i = 0; i < workerThreads.Length; i++)
 43bubuko.com,布布扣            {
 44bubuko.com,布布扣                workerThreads[i].Abort();
 45bubuko.com,布布扣            }
 46bubuko.com,布布扣
 47bubuko.com,布布扣            Console.WriteLine();
 48bubuko.com,布布扣            Console.WriteLine(totalOrdersProcessed + " 张订单已经处理.");
 49bubuko.com,布布扣            Console.WriteLine("已终止处理.按任意键退出bubuko.com,布布扣bubuko.com,布布扣");
 50bubuko.com,布布扣            Console.ReadLine();
 51bubuko.com,布布扣        }
 52bubuko.com,布布扣
 53bubuko.com,布布扣        private static void ProcessOrders()
 54bubuko.com,布布扣        {
 55bubuko.com,布布扣            TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize));
 56bubuko.com,布布扣            Order order = new Order();  //逻辑层的PetShop.BLL.Order
 57bubuko.com,布布扣            while (true)
 58bubuko.com,布布扣            {
 59bubuko.com,布布扣                TimeSpan datatimeStarting = new TimeSpan(DateTime.Now.Ticks);
 60bubuko.com,布布扣                double elapsedTime = 0;
 61bubuko.com,布布扣
 62bubuko.com,布布扣                int processedItems = 0;
 63bubuko.com,布布扣                ArrayList queueOrders = new ArrayList();
 64bubuko.com,布布扣
 65bubuko.com,布布扣                //首先验证事务
 66bubuko.com,布布扣                using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout))
 67bubuko.com,布布扣                {
 68bubuko.com,布布扣                    //从队列中检索订单
 69bubuko.com,布布扣                    for (int i = 0; i < batchSize; i++)
 70bubuko.com,布布扣                    {
 71bubuko.com,布布扣                        try
 72bubuko.com,布布扣                        {
 73bubuko.com,布布扣                            //在一定时间 类接收队列的订单
 74bubuko.com,布布扣                            if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds)
 75bubuko.com,布布扣                            {
 76bubuko.com,布布扣                                queueOrders.Add(order.ReceiveFromQueue(queueTimeout));
 77bubuko.com,布布扣                            }
 78bubuko.com,布布扣                            else
 79bubuko.com,布布扣                            {
 80bubuko.com,布布扣                                i = batchSize;  // 结束循环
 81bubuko.com,布布扣                            }
 82bubuko.com,布布扣                            elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datatimeStarting.TotalSeconds;
 83bubuko.com,布布扣                        }
 84bubuko.com,布布扣                        catch (TimeoutException)
 85bubuko.com,布布扣                        {
 86bubuko.com,布布扣                            //没有可以等待的消息也结束循环
 87bubuko.com,布布扣                            i = batchSize;
 88bubuko.com,布布扣                        }
 89bubuko.com,布布扣                    }
 90bubuko.com,布布扣
 91bubuko.com,布布扣                    //处理队列的订单
 92bubuko.com,布布扣                    for (int k = 0; k < queueOrders.Count; k++)
 93bubuko.com,布布扣                    {
 94bubuko.com,布布扣                        order.Insert((OrderInfo)queueOrders[k]);
 95bubuko.com,布布扣                        processedItems++;
 96bubuko.com,布布扣                        totalOrdersProcessed++;
 97bubuko.com,布布扣                    }
 98bubuko.com,布布扣                    //处理完毕或者是超时
 99bubuko.com,布布扣                    ts.Complete();
100bubuko.com,布布扣                }
101bubuko.com,布布扣                Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds.");
102bubuko.com,布布扣            }
103bubuko.com,布布扣        }
104bubuko.com,布布扣    }
105bubuko.com,布布扣}
106bubuko.com,布布扣


      MSMQ技术除了用于异步处理之外,还可作为一种分布式处理技术应用。关于使用MSMQ进行分布式处理相关的内容,本人能力有限,还请大家查看相关的资料进行了解。
      本文介绍了MSMQ和多线程以及对PetShop 4.0中对MSMQ的应用进行了分析,结合之前我写的两篇关于MSMQ的相关知识点的介绍文章,对MSMQ算是建立了一个全面的认识,希望这三篇文章对大家学习MSMQ有所帮助。

文中示例代码下载

ASP.NET中进行消息处理(MSMQ) 三

标签:style   blog   http   io   os   使用   ar   for   strong   

原文地址:http://www.cnblogs.com/soundcode/p/4029961.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!