码迷,mamicode.com
首页 > 编程语言 > 详细

C#并行编程(6):线程同步面面观

时间:2019-04-12 13:26:53      阅读:170      评论:0      收藏:0      [点我收藏+]

标签:编译器   大于   阻塞   声明   adc   内存地址   tst   阅读   信号   

理解线程同步

线程的数据访问

在并行(多线程)环境中,不可避免地会存在多个线程同时访问某个数据的情况。多个线程对共享数据的访问有下面3种情形:

  1. 多个线程同时读取数据;
  2. 单个线程更新数据,此时其他线程读取数据;
  3. 多个线程同时更新数据。

显而易见,多个线程同时读取数据是不会产生任何问题的。仅有一个线程更新数据的时候,貌似也没有问题,但真的没有问题吗?多个线程同时更新数据,很明显,你可能把我的更改覆盖掉了,数据从此不再可信。

什么是线程同步

为了解决多线程同时访问共享数据可能导致数据被破坏的问题,我们需要采取一些措施来保证数据的一致性,让每个线程都能准确地读取或更新数据。

问题的根源在于多个线程同时访问数据,那么只要我们保证同一时间只有一个线程访问数据,就能解决问题。保证同一时间只有一个线程访问数据的处理,就是线程同步了。我在访问数据的时候,你们都先等着,我完事了你们再来。

C#中的线程同步

.NET提供了很多线程同步的方式,这些方式分为用户模式和内核模式以及混合模式(即用户模式与内核模式的结合),下面会总结C#/.NET中各模式下的线程同步。

用户模式与内核模式

Windows操作系统下,CPU跟据所执行代码的不同,会在两种模式下进行切换。CPU执行应用程序代码(如我们开发的.NET程序)时,一般运行在用户模式下;执行操作系统核心代码(内核函数或者某些设备驱动程序)时,CPU则切换到内核模式。

用户模式的代码只能访问自身进程的专有地址空间,代码异常不会影响到其他程序或者操作系统;内核模式的所有代码共享单个地址空间,代码异常将可能导致系统崩溃。CPU的模式切换,是为了保证应用程序和操作系统的稳定性。

应用程序中,线程可以通过Windows API调用操作系统内核函数,这时候执行线程的CPU将从用户模式切换到内核模式,执行完操作系统函数后,再由内核模式切换到用户模式。CPU的模式切换是很耗时的,据《Windows核心编程》中的描述,CPU模式的切换,要占用1000个以上的CPU周期。因此,在我们的.NET程序中,应该尽可能地避免CPU的模式切换。

用户模式线程同步

用户模式下,利用特殊的CPU指令来协调线程,使同一时间只有一个线程能访问某内存地址,这种协调在硬件中发生,速度很快。这种模式下,CPU指令对线程的阻塞很短暂,操作系统调度线程时不会认为该线程已被阻塞,这种情况下,线程池不会创建新的线程来替换该线程。

用户模式下,等待资源的线程会一直被操作系统调度,导致线程的“自旋”并因此浪费很多的CPU资源。如果某线程一直占着资源不释放,等待该资源的线程将一直处于自旋状态,这样就造成了“活锁”,活锁除了浪费内存外,还会浪费大量CPU。

.NET提供两种用户模式的线程同步,volatileinterlocked,即易变和互锁。

volatile关键字和Volatile

上面我们遗留了一个问题:只有一个线程更新数据,其他线程读取数据,会不会出现问题?先看一个例子:

private static bool _stop;
public static void Run()
{//主线程
    Task.Run(() =>
    {//任务线程
        int number = 1;
        while (!_stop) //读取_stop
        {
            number++;
        }
        Console.WriteLine($"increase stopped,value = {number}");
    });

    Thread.Sleep(1000);
    _stop = true; //更新_stop
}

编译器和CPU会对上面的代码进行优化(调试模式不会优化),任务线程在执行时,会把_stop读取到CPU寄存器中,while循环的时候,每次都从当前CPU寄存器中读取_stop;同样,主线程执行的时候CPU也会把_stop读取到寄存器,更新_stop时,先更新是CPU寄存器中的_stop值,再把值存到变量_stop;在并行环境中,主线程和任务线程独立执行,主线程对_stop的更新并不会公开到任务线程,这样,任务线程的while循环便不会停止,永远无法得到输出。

把变量读到寄存器只是CPU优化代码的一种方式,CPU还可能调整代码的执行顺序,当前,CPU任务这种调整不会改变代码的意图。上面的代码说明,由于编译器和CPU的优化,只有一个线程更新数据,也可能存在问题

这种情况,我们可以使用volatile关键字或者类System.Threading.Volatile来阻止编译器和CPU的优化,这种阻止利用的是内存屏障MemoryBarrier,它告诉CPU在执行完屏障之前的内存存取后才能执行屏障后面的内存存取。上面代码的问题在于,while循环读取到的值总是CPU寄存器中的false。我们把while循环的条件改成!Volatile.Read(ref _stop)或者把用volatile声明变量_stop,while条件直接读取内存中的值,问题就能得到解决。

Interlocked原子访问

.NET提供的另一种用户模式线程同步方式是System.Threading.InterlockedInterlocked的工作依赖于代码运行的CPU平台,如果是X86的CPU,Interlocked函数会在总线上维持一个硬件信号,来阻止其他CPU访问同一内存地址(《Windows核心编程第五版》)。计算机对变量的修改一般来说并不是原子性的,而是分为3个步骤:

  1. 将变量值加载到CPU寄存器
  2. 改变值
  3. 将更新后的值存储到内存中

假如执行了前两个步骤后,CPU被抢占,变量在之前线程中的修改将丢失。Interlocked函数保证对值的修改是原子性的,一个线程完成变量的修改和存储后,另一个线程才能修改变量

System.Threading.Interlocked提供了很多方法,例如递增、递减、求和等,下面用Interlocked的递增方法展示其线程同步功能。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number1 = 0;
    int number2 = 0;

    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    IList<Task> increaseTasks = new List<Task>();

    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            Interlocked.Increment(ref number1);
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            Interlocked.Increment(ref number1);
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            number2++;
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            number2++;
        }
    }));

    Task.WaitAll(increaseTasks.ToArray());

    Console.WriteLine($"use interlocked: number1 result = {number1}");
    Console.WriteLine($"normal increase: number2 result = {number2}");
}

运行上面的代码多次(每个线程增加的数量尽量大,否则不容易体现结果),每次number1的结果都一样,number2的结果都不同,足以体现Interlocked的线程同步功能。

SpinLock自旋锁

System.Threading.SpinLock是基于InterLocked和SpinWait实现的轻量级自旋锁,具体的实现方式这里不去关心。SpinLock的简单用法如下:

private static SpinLock _spinlock = new SpinLock();
public static void DoWork()
{
    bool lockTaken = false;
    try
    {
        _spinlock.Enter(ref lockTaken);
        //DoWork
    }
    finally
    {
        if (lockTaken)
        {
            _spinlock.Exit(false);
        }
    }
}

SpinLock很轻量级,性能较高,但由于是自旋锁,锁定的操作应该是很快完成,否则会因线程自旋而浪费CPU。

内核模式线程同步

除了用户模式的两种线程同步方式,我们还会利用Windows系统的内核对象实现线程的同步。使用系统内核对象将会导致执行线程的CPU运行模式的切换,这会有很大的消耗,所以能够使用用户模式的线程同步就尽量避免使用内核模式。

内核模式下,线程在等待资源时会被系统阻塞,避免了CPU的浪费,这是内核模式优势。假如线程等待的资源一直被占用则线程将一直处于阻塞状态,造成“死锁”。相对于活锁,死锁只会浪费内存资源。

我们使用系统内核中的事件、信号量和互斥量进行内核模式的线程同步。

利用内核事件实现线程同步

事件实际上是由系统内核维护的一个布尔值。

.NET提供System.Threading.EventWaitHandle进行线程的信号交互。EventWaitHandle继承WaitHandle(封装等待对共享资源独占访问的操作系统特定的对象),有三个关键方法:

  • Set():将事件状态设置为终止状态,允许一个或多个等待线程继续。
  • Reset():将事件状态设置为非终止状态,导致线程阻塞
  • WaitOne():阻塞线程直到收到事件状态信号

线程交互事件有自动重置和手动重置两种类型,分别由AutoResetEventManualResetEvent继承EventWaitHandle得到。自动重置事件在Set唤醒第一个阻塞线程之后,会自动Reset事件,其他阻塞线程仍保持阻塞状态;而手动重置事件Set时,会唤醒所有被该事件阻塞的线程,手动Reset后,事件才会继续起作用。手动重置事件的这种性质,导致它不能用于线程同步,因为不能保证同一时间只有一个线程访问资源;相反,自动重置时间则很适合用来处理线程同步。

下面的例子演示了利用自动重置时间进行的线程同步。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number = 0;
    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    AutoResetEvent are = new AutoResetEvent(true);//初始化一个终止状态的线程同步事件

    IList<Task> increaseTasks = new List<Task>();
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            are.WaitOne();// 阻塞线程,直到被同步事件唤醒
            number++;
            are.Set();// 将事件设为终止状态,唤醒其他线程
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            are.WaitOne();
            number++;
            are.Set();
        }
    }));

    Task.WaitAll(increaseTasks.ToArray());
    are.Dispose();
    Console.WriteLine($"use AutoResetEvent: result = {number}");
}

利用信号量进行线程同步

信号量是系统内核维护的一个整型变量。

信号量值为0时,所有等待信号量的线程会被阻塞;信号量值大于零0,等待的线程会被解除阻塞,每唤醒一个阻塞的线程,系统内核就会把信号量的值减1。此外,我们能够对信号量进行最大值限制,从而控制访问同一资源的最大线程数量。

.Net中,利用System.Threading.Semaphore进行信号量操作。下面时利用信号量实现线程同步的一个例子。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number = 0;
    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    Semaphore semaphore = new Semaphore(1,1); //初始化信号量,这里初始值要设置为1,否则同步会有问题

    IList<Task> increaseTasks = new List<Task>();
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            semaphore.WaitOne();
            number++;
            semaphore.Release(1);// 退出信号量

        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            semaphore.WaitOne();
            number++;
            semaphore.Release(1);

        }
    }));

    Task.WaitAll(increaseTasks.ToArray());
    semaphore.Dispose();
    Console.WriteLine($"use Semaphore: result = {number}");
}

利用互斥体进程线程同步

互斥体Mutex的使用与自动重置事件和信号量类似,这里不再进行详细的总结。

互斥体常被用来保证应用程序只有一个实例运行,具体用法如下:

bool createNew;
using (new Mutex(true, Assembly.GetExecutingAssembly().FullName, out createNew))
{
    if (!createNew)
    {//系统已经存在同名的互斥体,说明已有程序实例在运行
        //这里做一些提示
        Environment.Exit(0);//退出
    }
    else
    {
        //启动实例的代码
    }
}

线程同步的混合模式

通过上面的总结我们知道,用户模式和内核模式由各自的优缺点,需要有一种模式既能兼顾用户和内核模式的优点又能避免他们的缺点,这就是混合模式。

混合模式会优先使用用户模式的线程同步处理,当多个线程竞争同步锁的时候,才会使用内核对象进行处理。如果多个线程一直不产生资源竞争,就不会发生CPU用户模式到内核模式的转换,开始资源竞争时,又会通过线程阻塞来防止CPU资源的浪费。

.NET中提供了多种混合模式的线程同步方式。例如手工重置事件和信号量的简化版本ManualResetEventSlimSemaphoreSlim,他们是线程在用户模式中自旋,直到发生资源竞争。具体使用与各自的内核模式一样,这里不再赘述。

lock关键字和Monitor

相信lock加锁是很多人做常用的线程同步方式。lock的使用很简单,如下:

private static readonly object _syncObject = new object();
public static void DoWork()
{
    lock (_syncObject)
    {
        //DoWork
    }
}

实际上,lock语法是对System.Threading.Monitor使用的一种简化,Monitor的用法如下:

private static readonly object _syncObject = new object();
public static void DoWork()
{
    Monitor.Enter(_syncObject);
    //DoWork
    Monitor.Exit(_syncObject);
}

使用Monitor的可能会出先一些意象不到的问题。例如,如果不相关的业务代码在使用Monitor进行线程同步的时候,锁定了同一字符串,将会造成不相关业务代码的同步执行;此外需要注意的是,Monitor不能使用值类型作为锁对象,值类型会被装箱,装箱后的对象不同,将导致无法同步。

读写锁ReaderWriterLockSlim

ReaderWriterLockSlim可以用来实现多线程读取或独占写入的资源访问。读写锁的线程控制逻辑如下:

  • 一个线程写数据时,其他请求资源的线程全部被阻塞;
  • 一个线程读数据时,写线程被阻塞,其他读线程能继续运行;
  • 写结束时,解除其他某个写线程的阻塞,或者解除所有读线程的阻塞;
  • 读结束时,解除一个写线程的阻塞。

下面是读写锁的简单用法,详细用法可参考msdn文档。

private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
public static void DoWork()
{
_rwlock.EnterWriteLock();
//DoWork
_rwlock.ExitWriteLock();
}

ReaderWriterLockSlim还有一个比较老的版本ReaderWriterLock,据说存在较多问题应尽量避免使用。

线程安全集合

.NET除了提供包含上面总结到的各种线程同步的诸多方式外,还封装了一些线程安全集合。这些集合在内部实现了线程同步,我们直接使用即可,很友好。线程安全集合在命名空间System.Collections.Concurrent下,包括ConcurrentQueue (T),ConcurrentStack<T>,ConcurrentDictionary<TKey,TValue>,ConcurrentBag<T>,BlockingCollection<T>,具体可阅读《何时使用线程安全集合》

各种线程同步性能对比

下面我们对整数零进行多线程递增操作,每个线程固定递增量,来测试以下各种同步方式的性能对比。测试代码如下。

/// <summary>
/// 全局目标数据,使用多线程进行递增
/// </summary>
private static int _numberToIncrease;

public static void Run()
{
    int increment = 100000;
    int threadCount = 4;
    DoIncrease(increment, threadCount, DoIncreaseByInterLocked);
    DoIncrease(increment, threadCount, DoIncreaseWithSpinLock);
    DoIncrease(increment, threadCount, DoIncreaseWithEvent);
    DoIncrease(increment, threadCount, DoIncreaseWithSemaphore);
    DoIncrease(increment, threadCount, DoIncreaseWithMonitor);
    DoIncrease(increment, threadCount, DoIncreaseWithReaderWriterLockSlim);
}

/// <summary>
/// 递增运算
/// </summary>
/// <param name="increment">单线程递增量</param>
/// <param name="threadCount">线程数</param>
/// <param name="action">递增方法</param>
public static void DoIncrease(int increment, int threadCount, Action<int> action)
{
    _numberToIncrease = 0; //重置目标数据
    IList<Task> increaseTasks = new List<Task>(threadCount);
    Stopwatch watch = Stopwatch.StartNew();
    for (int i = 0; i < threadCount; i++)
    {
        increaseTasks.Add(Task.Run(() => action(increment)));
    }
    Task.WaitAll(increaseTasks.ToArray());
    Console.WriteLine($"{action.Method.Name}=> Result: {_numberToIncrease} , Time: {watch.ElapsedMilliseconds} ms.");
}

#region 使用Interlocked,用户模式

public static void DoIncreaseByInterLocked(int increment)
{
    for (int i = 0; i < increment; i++)
    {
        Interlocked.Increment(ref _numberToIncrease);
    }
}

#endregion

#region 使用SpinLock,用户模式

private static SpinLock _spinlock = new SpinLock();
public static void DoIncreaseWithSpinLock(int increment)
{
    for (int i = 0; i < increment; i++)
    {
        bool lockTaken = false;
        try
        {
            _spinlock.Enter(ref lockTaken);
            _numberToIncrease++;
        }
        finally
        {
            if (lockTaken)
            {
                _spinlock.Exit(false);
            }
        }
    }
}

#endregion

#region 使用信号量Semaphore,内核模式

private static readonly Semaphore _semaphore = new Semaphore(1, 10);

public static void DoIncreaseWithSemaphore(int increment)
{
    for (int i = 0; i < increment; i++)
    {
        _semaphore.WaitOne();
        _numberToIncrease++;
        _semaphore.Release(1);
    }
}

#endregion

#region 使用事件AutoResetEvent,内核模式

private static readonly AutoResetEvent _are = new AutoResetEvent(true);
public static void DoIncreaseWithEvent(int increment)
{
    for (int i = 0; i < increment; i++)
    {
        _are.WaitOne();
        _numberToIncrease++;
        _are.Set();
    }
}

#endregion

#region 使用Monitor,混合模式

private static readonly object _monitorLocker = new object();
public static void DoIncreaseWithMonitor(int increment)
{
    for (int i = 0; i < increment; i++)
    {
        bool lockTaken = false;
        try
        {
            Monitor.Enter(_monitorLocker, ref lockTaken);
            _numberToIncrease++;
        }
        finally
        {
            if (lockTaken)
            {
                Monitor.Exit(_monitorLocker);
            }
        }
    }
}

#endregion

#region 使用ReaderWriterLockSlim,混合模式

private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
public static void DoIncreaseWithReaderWriterLockSlim(int increment)
{
    for (int i = 0; i < increment; i++)
    {
        _rwlock.EnterWriteLock();
        _numberToIncrease++;
        _rwlock.ExitWriteLock();
    }
}

#endregion

下面是一组测试结果,可以很明显地看出,内核模式是相当耗时的,应尽量避免使用。而用户模式和混合模式,也需要根据具体的场景进行选择。这个测试过于简单,不具有普遍性。

DoIncreaseByInterLocked=> Result: 400000 , Time: 15 ms.
DoIncreaseWithSpinLock=> Result: 400000 , Time: 75 ms.
DoIncreaseWithEvent=> Result: 400000 , Time: 1892 ms.
DoIncreaseWithSemaphore=> Result: 400000 , Time: 1779 ms.
DoIncreaseWithMonitor=> Result: 400000 , Time: 14 ms.
DoIncreaseWithReaderWriterLockSlim=> Result: 400000 , Time: 22 ms.

小结

本文对C#/.NET中的线程同步进行了尽量详尽的总结,并行环境中在追求程序的高性能、响应性的同时,务必要保证数据的安全性。

C#并行编程系列的文章暂时就告一段落了。刚开始写博客,文章肯定存在不少问题,欢迎各位博友指出。

C#并行编程(6):线程同步面面观

标签:编译器   大于   阻塞   声明   adc   内存地址   tst   阅读   信号   

原文地址:https://www.cnblogs.com/chenbaoshun/p/10695343.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!