码迷,mamicode.com
首页 > 其他好文 > 详细

转:Task任务调度实现生产者消费者模式

时间:2015-05-30 17:51:10      阅读:152      评论:0      收藏:0      [点我收藏+]

标签:

我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):

1. FIFO(先进先出)

     P产生1,2,3,4,5,6,3,2

     C处理顺序应为1,2,3,4,5,6,3,2

2.LIFO(后进先出)

     P产生1,2,3,4,5,6,3,2

     C处理顺序应为2,3,6,5,4,3,2,1

3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)

     P产生1,2,3,4,5,6,3,2

     C处理顺序为1,4,5,6,3,2

4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)

     P产生1,2,3,4,5,6,3,2

     C处理顺序为2,3,6,5,4,1

1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)

C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。

定义上述4种场景的通用接口以及其遍历类

public interface IScheduler : IEnumerable<Task >

    {

void Add (Task t);

void Remove (Task t);

int Count { get; }

Task this [int index] { get; set ; }

    }

public class SchedulerEnumerator : IEnumerator< Task>

    {

private IScheduler _collection;

private int _currentIndex;

private Task _currentTask;

public SchedulerEnumerator (IScheduler collection)

        {

_collection = collection ;

_currentIndex = -1;

_currentTask = default (Task);

        }

public bool MoveNext()

        {

//Avoids going beyond the end of the collection.

if (++_currentIndex >= _collection. Count)

            {

return false ;

            }

else

            {

// Set current box to next item in collection.

_currentTask = _collection [_currentIndex];

            }

return true ;

        }

public void Reset() { _currentIndex = -1; }

void IDisposable .Dispose() { }

public Task Current

        {

get { return _currentTask; }

        }

object IEnumerator .Current

        {

get { return Current; }

        }

    }

实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型

public class TaskSchedulerBase <T> : TaskScheduler

where T : IScheduler , new ()

    {

private Thread _processThread;

private readonly object _lock = new object ();

public TaskSchedulerBase()

        {

            _processThread = new Thread (this.Process);

        }

private void Process()

        {

lock (_lock)

            {

var tasks = GetScheduledTasks();

if (null != tasks)

                {

foreach (var t in tasks)

                    {

                        TryExecuteTask(t);

                        TryDequeue(t);

                    }

                }

            }

        }

protected override void QueueTask( Task task)

        {

lock (_lock)

            {

                Scheduler.Add(task);

if (_processThread.ThreadState.Equals(ThreadState .Stopped))

                {

                    _processThread = new Thread (Process);

                }

if (!_processThread.IsAlive

                    && !_processThread.ThreadState.Equals( ThreadState.Running))

                {

try

                    {

                        _processThread.Start();

                    }

catch (System.Exception )

                    {

if (!_processThread.ThreadState.Equals(ThreadState .Running))

                        {

                            _processThread = new Thread (Process);

                            _processThread.Start();

                        }

                    }

                }

            }

        }

protected override bool TryDequeue( Task task)

        {

            Scheduler.Remove(task);

return true ;

        }

protected override IEnumerable< Task> GetScheduledTasks()

        {

return Scheduler.ToArray();

        }

protected override bool TryExecuteTaskInline( Task task, bool taskWasPreviouslyQueued)

        {

if (taskWasPreviouslyQueued)

            {

if (TryDequeue(task))

                {

return base .TryExecuteTask(task);

                }

else

                {

return false ;

                }

            }

else

            {

return base .TryExecuteTask(task);

            }

        }

private readonly T _scheduler = new T();

public T Scheduler

        {

get

            {

return _scheduler;

            }

        }

    }


实现4种队列

     1.FIFO

public class QueueScheduler : IScheduler

    {

protected Queue <Task> _queue;

public QueueScheduler ()

        {

_queue = new Queue< Task>();

        }

public void Add( Task t )

        {

if (!Contains (t))

            {

_queue.Enqueue (t);

            }

        }

public void Remove( Task t )

        {

_queue.Dequeue ();

        }

public bool Contains( Task t )

        {

bool found = false;

foreach (var task in _queue )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

found = true ;

break;

                }

            }

return found ;

        }

public bool Contains( Task t , EqualityComparer< Task> comp )

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _queue. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _queue.ToArray ()[index]; }

set { _queue .ToArray()[index] = value; }

        }

    }


     2.LIFO

public class StackScheduler : IScheduler

    {

protected Stack <Task> _stack;

public StackScheduler ()

        {

_stack = new Stack< Task>();

        }

public void Add( Task t )

        {

if (!Contains (t))

            {

_stack.Push (t);

            }

        }

public void Remove( Task t )

        {

_stack.Pop ();

        }

public bool Contains( Task t )

        {

bool found = false;

foreach (var task in _stack )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

found = true ;

break;

                }

            }

return found ;

        }

public bool Contains( Task t , EqualityComparer< Task> comp )

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _stack. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _stack.ToArray ()[index]; }

set { _stack .ToArray()[index] = value; }

        }

    }


     3.Dynamic FIFO

public class DynamicQueueScheduler : IScheduler

    {

protected List <Task> _queue;

public DynamicQueueScheduler ()

        {

_queue = new List< Task>();

        }

public virtual void Add(Task t)

        {

Task oldTask = null;

if (Contains (t, out oldTask ))

            {

_queue.Remove (oldTask);

            }

_queue.Add (t);

        }

public virtual void Remove(Task t)

        {

_queue.Remove (t);

        }

public virtual bool Contains(Task t)

        {

Task oldTask = null;

bool found = Contains( t, out oldTask);

return found ;

        }

public virtual bool Contains(Task t, out Task oldTask)

        {

bool found = false;

oldTask = null ;

foreach (var task in _queue )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

oldTask = task ;

found = true ;

break;

                }

            }

return found ;

        }

public virtual bool Contains(Task t, EqualityComparer<Task > comp)

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _queue. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _queue[index]; }

set { _queue [index] = value; }

        }

    }


     4.Dynamic LIFO

public class DynamicStackScheduler : IScheduler

    {

protected List <Task> _queue;

public DynamicStackScheduler ()

        {

_queue = new List< Task>();

        }

public void Add( Task t )

        {

Task oldTask = null;

if (Contains (t, out oldTask ))

            {

_queue.Remove (oldTask);

            }

_queue.Insert (0,t);

        }

public void Remove( Task t )

        {

_queue.Remove (t);

        }

public bool Contains( Task t )

        {

Task oldTask = null;

bool found = Contains( t, out oldTask);

return found ;

        }

public bool Contains( Task t , out Task oldTask )

        {

bool found = false;

oldTask = null ;

foreach (var task in _queue )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

oldTask = task ;

found = true ;

break;

                }

            }

return found ;

        }

public bool Contains( Task t , EqualityComparer< Task> comp )

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _queue. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _queue[index]; }

set { _queue [index] = value; }

        }

    }


测试代码

class Program

    {

static Queue <int> _queue = new Queue< int>();

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());

static TaskFactory _factory = new TaskFactory (new TaskSchedulerBase<DynamicQueueScheduler >());

static void Main( string[] args )

        {

var thread1 = new Thread(Producer );

var thread2 = new Thread(Consumer );

thread1.Start ();

thread2.Start ();

Console.ReadKey ();

        }

static void Producer()

        {

for (int i = 0; i < 7; i ++)

            {

_queue.Enqueue (i);

            }

_queue.Enqueue (3);

_queue.Enqueue (2);

        }

static void Consumer()

        {

while (true )

            {

if (_queue .Count > 0)

                {

foreach (var i in _queue )

                    {

_factory.StartNew ((s) =>

                        {

Console.Write ("{0} on thread {1} {2}\n", s,Thread.CurrentThread .ManagedThreadId,

DateTime.Now.ToLongTimeString());

                        }, i);

                    }

_queue.Clear ();

                }

else

                {

Thread.Sleep (1);

                }

            }

        }

    }

转:Task任务调度实现生产者消费者模式

标签:

原文地址:http://www.cnblogs.com/Redvelvet/p/4540566.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!