线程安全Ⅱ
首页 专栏 c# 文章详情
0

线程安全Ⅱ

DoubleJ 发布于 2 月 25 日
混合模式

由于用户模式和内核模式各有优劣,为了利用两者各自的有点,因此可以同时使用两种模式来进行构造,在没有线程竞争的时候可以具有用户模式的性能优势,而在多个线程同时竞争一个构造的时候又能提供不产生自旋的优点,使应用程序的性能得到提升。

示例代码
class HybridLock : IDisposable
{
    private int m_Waiters = 0;

    private AutoResetEvent m_WaiterLock = new AutoResetEvent(false);

    public void Enter()
    {
        //线程想要获得锁
        if (Interlocked.Increment(ref this.m_Waiters) == 1)
            return; //锁可以使用直,接返回

        //另一个线程正在等待,阻塞该线程
        this.m_WaiterLock.WaitOne(); //产生较大的性能影响
        //WaitOne返回后,这个线程便拥有了锁
    }

    public void Leave()
    {
        //这个线程准备释放锁
        if (Interlocked.Decrement(ref this.m_Waiters) == 0)
            return; //没有其它线程阻塞,直接返回

        //有其它线程正在阻塞,唤醒其中一个
        this.m_WaiterLock.Set(); //产生较大的性能影响
    }

    public void Dispose()
    {
        this.m_WaiterLock.Dispose();
    }
}
分析代码

HybridLock对象的Enter方法调用了Interlocked.Increment,让m_Waiters字段递增1,这个线程发现没有线程拥有这个锁,因此该线程获得锁并直接返回(获得锁的速度非常快)。如果另一个线程介入并调用Enter,此时m_Waiters字段再次递增值为2,发现锁被另一个线程拥有,所以这个线程会调用AutoResetEvent对象的WaitOne来阻塞本身(这里由于是内核模式所以会产生性能影响),防止自旋。

再看看Leave方法,一个线程调用Leave时,会调用Interlocked.Decrement使m_Waiters字段递减1,如果m_Waiters字段是0,说明没有其它线程在Enter的调用中发生阻塞,这时线程可以直接返回。但是如果线程发现m_Waiters字段的值递减后不为0,说明存在线程竞争,至少有一个线程在内核中阻塞,这个线程必须只能唤醒一个阻塞的线程,通过调用AutoResetEvent对象的Set方法实现,

自旋、线程所有权和递归

由于转换成内核代码会造成性能损失,而线程占有一个锁的时间通常比较短,所以可以先让线程处于用户模式且自旋一段时间,若还未获得锁的权限便可让它转为内核模式,如果线程在等待期间锁变得可用便可避免转为内核模式了。

示例代码(提供自旋、线程所有权和递归支持)
class HybridLock : IDisposable
{
    //用户模式
    private int m_Waiters = 0;

    //内核模式
    private AutoResetEvent m_WaiterLock = new AutoResetEvent(false);

    //控制自旋
    private int m_SpinCount = 1000;

    //用有锁的线程
    private int m_OwningThreadId = 0;

    //拥有次数
    private int m_Recursion = 0;

    public void Enter()
    {
        //如果线程已经拥有,递增递归次数并返回
        var threadId = Thread.CurrentThread.ManagedThreadId;
        if(this.m_OwningThreadId == threadId)
        {
            this.m_Recursion++;
            return;
        }

        //尝试获取
        var spinWait = new SpinWait();
        for (int i = 0; i < this.m_SpinCount; i++)
        {
            //如果锁可以使用了
            if (Interlocked.CompareExchange(ref this.m_Waiters, 1, 0) == 0)
                goto GotLock;

            //给其它线程运行的机会,希望锁会被释放
            spinWait.SpinOnce();
        }

        //自旋结束,仍然没有获得锁则再试一次
        if(Interlocked.Increment(ref this.m_Waiters) > 1)
        {
            //有其它线程被阻塞,这个线程也必须阻塞
            this.m_WaiterLock.WaitOne(); //性能损失
            //等待该线程用有锁醒来
        }

    GotLock:
        //一个线程用有锁时,记录ID并指出线程拥有锁一次
        this.m_OwningThreadId = threadId;
        this.m_Recursion = 1;
    }

    public void Leave()
    {
        //如果线程不用有锁,bug
        var threadId = Thread.CurrentThread.ManagedThreadId;
        if (threadId != this.m_OwningThreadId)
            throw new SynchronizationLockException("线程未拥有锁!");

        //递减递归计数,如果线程仍然用有锁,直接返回
        if (--this.m_Recursion > 0)
            return;

        //现在没有线程拥有锁
        this.m_OwningThreadId = 0;

        //若没有其它线程被阻塞直接返回
        if (Interlocked.Decrement(ref this.m_Waiters) == 0)
            return;

        //唤醒一个被阻塞的线程
        this.m_WaiterLock.Set(); //性能损失
    }

    public void Dispose()
    {
        this.m_WaiterLock.Dispose();
    }
}
Monitor类

这个类提供了一个互斥锁,这个锁支持自旋、线程所有权和递归,是线程同步常用的类。

工作方式

CLR初始化时分配一个同步块数组,一个对象在堆中创建的时候,都有两个额外的字段与它关联,第一个字段是“类型对象指针”,它包含类型对象的内存地址。第二个字段是“同步索引块”,它包含同步块数组中的一个整数索引。

一个对象在构造时,对象的同步块索引初始化为-1,表明不引用任何同步块。然后调用Monitor.Enter时,CLR在数组中找到一个空白同步块,并设置对象的同步块索引来引用该同步块。调用Exit时,会检查是否有其它任何线程正在等待使用对象的同步块,如果没有线程在等待,同步块就能自由被使用了,Exit将对象的同步块索引改回-1,自由的同步块将来可以和另一个对象关联。

堆中对象的同步块索引和CLR的同步块数组元素之间的关系图

Monitor的使用
class SomeType
{
    //对象私有锁
    private readonly object m_Lock = new object();

    public void DoSomething()
    {
        //进入私有锁
        Monitor.Enter(this.m_Lock);
        //其它代码...

        //退出私有锁
        Monitor.Exit(this.m_Lock);
    }
}
ReaderWriterLockSlim类
功能介绍
一个线程写入数据时,请求访问的其它所有线程都被阻塞 一个线程读取数据时,请求读取的其它线程允许继续执行,但请求写入的线程还是会被阻塞 写入数据的线程结束后,可以解除一个写入线程的阻塞,使它能写入数据,也可以解除所有读取线程的阻塞,让它们并发读取数据。如果没有线程被阻塞,锁就会进入自由状态,允许下一个reader或writer线程获取 读取数据的所有线程结束后,一个writer线程会被解除阻塞,使它能写入数据,如果没有线程被阻塞,锁就会进入自由状态,允许下一个reader或writer线程获取
ReaderWriterLockSlim用法
class SomeType : IDisposable
{
    private readonly ReaderWriterLockSlim m_Lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);

    public void DoWrite()
    {
        this.m_Lock.EnterWriteLock();
        //其它代码...

        //退出私有锁
        this.m_Lock.ExitWriteLock();
    }

    public void DoRead()
    {
        this.m_Lock.EnterReadLock();
        //其它代码...

        this.m_Lock.ExitReadLock();
    }

    public void Dispose()
    {
        this.m_Lock.Dispose();
    }
}

在构造ReaderWriterLockSlim对象的时候允许传递一个LockRecursionPolicy标志,如果传递LockRecursionPolicy.SupportsRecursion,锁就可以支持线程所有权和递归行为(这些行为会对锁的性能产生负面影响),如果支持这些行为锁必须跟踪允许进入锁的所有reader线程,同时为每个线程都单独维护一个递归技术,花费更大的代价,因此建议构造的时候传入LockRecursionPolicy.NoRecursion。

双检锁(延迟初始化)
class Singleton
{
    private static object s_Lock = new object();

    private static Singleton s_Value = null;

    //私有构造
    private Singleton() { }

    public static Singleton GetSingleton()
    {
        if (s_Value != null)
            return s_Value;

        //让一个线程创建它
        Monitor.Enter(s_Lock);
        if (s_Value == null)
        {
            var temp = new Singleton();
            Interlocked.Exchange(ref s_Value, temp);
        }
        Monitor.Exit(s_Lock);
        return s_Value;
    }
}
为何使用Interlocked.Exchange

为何不直接s_Value=new Singleton(),原因在于这种写法编译器可能先为Singleton分配内存,再将引用赋值给s_Value,最后调用构造器。从单线程来看这样并没什么问题,如果是多线程,此时在引用已经赋值给s_Value了,然而却还未调用构造器,这时另一个线程调用了GetSingleton方法,发现s_Value不为null,所以开始使用Singleton对象,但是对象的构造器还未执行结束,产生bug。而使用Interlocked.Exchange可以修正该问题,方法保证temp中的引用只有在构造器执行结束后才赋值给s_Value。

Monitor类的Wait与Pulse

如果一个线程希望一个复合条件为true时执行一些代码,便可以使用Wait与Pulse,在条件不满足的时候Wait,另一个线程更改条件后Pulse,而非让线程自旋连续检测条件,浪费CPU时间。

示例代码
class ConditionVariablePattern
{
    private readonly object m_Lock = new object();
    private bool m_Condition = false;

    public void Thread1()
    {
        //获取一个互斥锁
        Monitor.Enter(this.m_Lock);

        //在锁中原子性地检测复合条件
        while (!this.m_Condition)
        {
            //条件不满足,等待其它线程修改
            Monitor.Wait(this.m_Lock);
        }

        //条件满足,处理数据...
        Monitor.Exit(this.m_Lock);
    }

    public void Thread2()
    {
        //获取一个互斥锁
        Monitor.Enter(this.m_Lock);

        //处理数据并修改条件
        this.m_Condition = true;

        //Monitor.Pulse(this.m_Lock);     //释放之后唤醒一个正在等待的线程
        Monitor.PulseAll(this.m_Lock);  //释放之后唤醒所有正在等待的线程

        Monitor.Exit(this.m_Lock);
    }
}
代码解析

Thread1获取一个互斥锁,然后对一个条件变量进行检测,如果条件不满足,调用Monitor.Wait释放锁并阻塞调用线程,其它线程能获得改锁。

Thread2获取锁的所有权,处理数据,造成一些状态的改变,其中包括Thread1要检测的条件变量,然后调用Monitor.PulseAll或者Monitor.Pulse,从而解除一个因为调用Wait方法而进入阻塞的线程。

其中Pulse只解除等待最久的线程的阻塞,PulseAll解除所有等待线程的阻塞,然而解除阻塞的线程还必须等待Thread2线程调用完Exit才能拥有锁。

Thread1醒来时,进行下一次循环迭代,再次对条件进行检测,如过条件仍为false,继续调用Wait。如果条件满足,处理一些数据,最后调用Exit永久释放锁。

BlockingCollection类实现生产者/消费者模式
static void Main()
{
    //BlockingCollection类实现生产者/消费者模式
    var bl = new BlockingCollection<int>(new ConcurrentQueue<int>());

    //由一个线程池执行消费
    ThreadPool.QueueUserWorkItem(ConsumeItems, bl);

    //在集合中添加数据项
    for (int i = 0; i < 6; i++)
    {
        Console.WriteLine("production item : {0}", i);
        bl.Add(i);
    }

    //通知消费线程不会在集合中添加更多的item了
    bl.CompleteAdding();

    Console.ReadKey();
}
运行结果

生产与消费行为可能出现交错

c# 编程 线程安全 线程同步
阅读 33 发布于 2 月 25 日
收藏
分享
本作品系原创, 采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
avatar
DoubleJ
1 声望
2 粉丝
关注作者
0 条评论
得票 时间
提交评论
avatar
DoubleJ
1 声望
2 粉丝
关注作者
宣传栏
目录
混合模式

由于用户模式和内核模式各有优劣,为了利用两者各自的有点,因此可以同时使用两种模式来进行构造,在没有线程竞争的时候可以具有用户模式的性能优势,而在多个线程同时竞争一个构造的时候又能提供不产生自旋的优点,使应用程序的性能得到提升。

示例代码
class HybridLock : IDisposable
{
    private int m_Waiters = 0;

    private AutoResetEvent m_WaiterLock = new AutoResetEvent(false);

    public void Enter()
    {
        //线程想要获得锁
        if (Interlocked.Increment(ref this.m_Waiters) == 1)
            return; //锁可以使用直,接返回

        //另一个线程正在等待,阻塞该线程
        this.m_WaiterLock.WaitOne(); //产生较大的性能影响
        //WaitOne返回后,这个线程便拥有了锁
    }

    public void Leave()
    {
        //这个线程准备释放锁
        if (Interlocked.Decrement(ref this.m_Waiters) == 0)
            return; //没有其它线程阻塞,直接返回

        //有其它线程正在阻塞,唤醒其中一个
        this.m_WaiterLock.Set(); //产生较大的性能影响
    }

    public void Dispose()
    {
        this.m_WaiterLock.Dispose();
    }
}
分析代码

HybridLock对象的Enter方法调用了Interlocked.Increment,让m_Waiters字段递增1,这个线程发现没有线程拥有这个锁,因此该线程获得锁并直接返回(获得锁的速度非常快)。如果另一个线程介入并调用Enter,此时m_Waiters字段再次递增值为2,发现锁被另一个线程拥有,所以这个线程会调用AutoResetEvent对象的WaitOne来阻塞本身(这里由于是内核模式所以会产生性能影响),防止自旋。

再看看Leave方法,一个线程调用Leave时,会调用Interlocked.Decrement使m_Waiters字段递减1,如果m_Waiters字段是0,说明没有其它线程在Enter的调用中发生阻塞,这时线程可以直接返回。但是如果线程发现m_Waiters字段的值递减后不为0,说明存在线程竞争,至少有一个线程在内核中阻塞,这个线程必须只能唤醒一个阻塞的线程,通过调用AutoResetEvent对象的Set方法实现,

自旋、线程所有权和递归

由于转换成内核代码会造成性能损失,而线程占有一个锁的时间通常比较短,所以可以先让线程处于用户模式且自旋一段时间,若还未获得锁的权限便可让它转为内核模式,如果线程在等待期间锁变得可用便可避免转为内核模式了。

示例代码(提供自旋、线程所有权和递归支持)
class HybridLock : IDisposable
{
    //用户模式
    private int m_Waiters = 0;

    //内核模式
    private AutoResetEvent m_WaiterLock = new AutoResetEvent(false);

    //控制自旋
    private int m_SpinCount = 1000;

    //用有锁的线程
    private int m_OwningThreadId = 0;

    //拥有次数
    private int m_Recursion = 0;

    public void Enter()
    {
        //如果线程已经拥有,递增递归次数并返回
        var threadId = Thread.CurrentThread.ManagedThreadId;
        if(this.m_OwningThreadId == threadId)
        {
            this.m_Recursion++;
            return;
        }

        //尝试获取
        var spinWait = new SpinWait();
        for (int i = 0; i < this.m_SpinCount; i++)
        {
            //如果锁可以使用了
            if (Interlocked.CompareExchange(ref this.m_Waiters, 1, 0) == 0)
                goto GotLock;

            //给其它线程运行的机会,希望锁会被释放
            spinWait.SpinOnce();
        }

        //自旋结束,仍然没有获得锁则再试一次
        if(Interlocked.Increment(ref this.m_Waiters) > 1)
        {
            //有其它线程被阻塞,这个线程也必须阻塞
            this.m_WaiterLock.WaitOne(); //性能损失
            //等待该线程用有锁醒来
        }

    GotLock:
        //一个线程用有锁时,记录ID并指出线程拥有锁一次
        this.m_OwningThreadId = threadId;
        this.m_Recursion = 1;
    }

    public void Leave()
    {
        //如果线程不用有锁,bug
        var threadId = Thread.CurrentThread.ManagedThreadId;
        if (threadId != this.m_OwningThreadId)
            throw new SynchronizationLockException("线程未拥有锁!");

        //递减递归计数,如果线程仍然用有锁,直接返回
        if (--this.m_Recursion > 0)
            return;

        //现在没有线程拥有锁
        this.m_OwningThreadId = 0;

        //若没有其它线程被阻塞直接返回
        if (Interlocked.Decrement(ref this.m_Waiters) == 0)
            return;

        //唤醒一个被阻塞的线程
        this.m_WaiterLock.Set(); //性能损失
    }

    public void Dispose()
    {
        this.m_WaiterLock.Dispose();
    }
}
Monitor类

这个类提供了一个互斥锁,这个锁支持自旋、线程所有权和递归,是线程同步常用的类。

工作方式

CLR初始化时分配一个同步块数组,一个对象在堆中创建的时候,都有两个额外的字段与它关联,第一个字段是“类型对象指针”,它包含类型对象的内存地址。第二个字段是“同步索引块”,它包含同步块数组中的一个整数索引。

一个对象在构造时,对象的同步块索引初始化为-1,表明不引用任何同步块。然后调用Monitor.Enter时,CLR在数组中找到一个空白同步块,并设置对象的同步块索引来引用该同步块。调用Exit时,会检查是否有其它任何线程正在等待使用对象的同步块,如果没有线程在等待,同步块就能自由被使用了,Exit将对象的同步块索引改回-1,自由的同步块将来可以和另一个对象关联。

堆中对象的同步块索引和CLR的同步块数组元素之间的关系图

Monitor的使用
class SomeType
{
    //对象私有锁
    private readonly object m_Lock = new object();

    public void DoSomething()
    {
        //进入私有锁
        Monitor.Enter(this.m_Lock);
        //其它代码...

        //退出私有锁
        Monitor.Exit(this.m_Lock);
    }
}
ReaderWriterLockSlim类
功能介绍
一个线程写入数据时,请求访问的其它所有线程都被阻塞 一个线程读取数据时,请求读取的其它线程允许继续执行,但请求写入的线程还是会被阻塞 写入数据的线程结束后,可以解除一个写入线程的阻塞,使它能写入数据,也可以解除所有读取线程的阻塞,让它们并发读取数据。如果没有线程被阻塞,锁就会进入自由状态,允许下一个reader或writer线程获取 读取数据的所有线程结束后,一个writer线程会被解除阻塞,使它能写入数据,如果没有线程被阻塞,锁就会进入自由状态,允许下一个reader或writer线程获取
ReaderWriterLockSlim用法
class SomeType : IDisposable
{
    private readonly ReaderWriterLockSlim m_Lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);

    public void DoWrite()
    {
        this.m_Lock.EnterWriteLock();
        //其它代码...

        //退出私有锁
        this.m_Lock.ExitWriteLock();
    }

    public void DoRead()
    {
        this.m_Lock.EnterReadLock();
        //其它代码...

        this.m_Lock.ExitReadLock();
    }

    public void Dispose()
    {
        this.m_Lock.Dispose();
    }
}

在构造ReaderWriterLockSlim对象的时候允许传递一个LockRecursionPolicy标志,如果传递LockRecursionPolicy.SupportsRecursion,锁就可以支持线程所有权和递归行为(这些行为会对锁的性能产生负面影响),如果支持这些行为锁必须跟踪允许进入锁的所有reader线程,同时为每个线程都单独维护一个递归技术,花费更大的代价,因此建议构造的时候传入LockRecursionPolicy.NoRecursion。

双检锁(延迟初始化)
class Singleton
{
    private static object s_Lock = new object();

    private static Singleton s_Value = null;

    //私有构造
    private Singleton() { }

    public static Singleton GetSingleton()
    {
        if (s_Value != null)
            return s_Value;

        //让一个线程创建它
        Monitor.Enter(s_Lock);
        if (s_Value == null)
        {
            var temp = new Singleton();
            Interlocked.Exchange(ref s_Value, temp);
        }
        Monitor.Exit(s_Lock);
        return s_Value;
    }
}
为何使用Interlocked.Exchange

为何不直接s_Value=new Singleton(),原因在于这种写法编译器可能先为Singleton分配内存,再将引用赋值给s_Value,最后调用构造器。从单线程来看这样并没什么问题,如果是多线程,此时在引用已经赋值给s_Value了,然而却还未调用构造器,这时另一个线程调用了GetSingleton方法,发现s_Value不为null,所以开始使用Singleton对象,但是对象的构造器还未执行结束,产生bug。而使用Interlocked.Exchange可以修正该问题,方法保证temp中的引用只有在构造器执行结束后才赋值给s_Value。

Monitor类的Wait与Pulse

如果一个线程希望一个复合条件为true时执行一些代码,便可以使用Wait与Pulse,在条件不满足的时候Wait,另一个线程更改条件后Pulse,而非让线程自旋连续检测条件,浪费CPU时间。

示例代码
class ConditionVariablePattern
{
    private readonly object m_Lock = new object();
    private bool m_Condition = false;

    public void Thread1()
    {
        //获取一个互斥锁
        Monitor.Enter(this.m_Lock);

        //在锁中原子性地检测复合条件
        while (!this.m_Condition)
        {
            //条件不满足,等待其它线程修改
            Monitor.Wait(this.m_Lock);
        }

        //条件满足,处理数据...
        Monitor.Exit(this.m_Lock);
    }

    public void Thread2()
    {
        //获取一个互斥锁
        Monitor.Enter(this.m_Lock);

        //处理数据并修改条件
        this.m_Condition = true;

        //Monitor.Pulse(this.m_Lock);     //释放之后唤醒一个正在等待的线程
        Monitor.PulseAll(this.m_Lock);  //释放之后唤醒所有正在等待的线程

        Monitor.Exit(this.m_Lock);
    }
}
代码解析

Thread1获取一个互斥锁,然后对一个条件变量进行检测,如果条件不满足,调用Monitor.Wait释放锁并阻塞调用线程,其它线程能获得改锁。

Thread2获取锁的所有权,处理数据,造成一些状态的改变,其中包括Thread1要检测的条件变量,然后调用Monitor.PulseAll或者Monitor.Pulse,从而解除一个因为调用Wait方法而进入阻塞的线程。

其中Pulse只解除等待最久的线程的阻塞,PulseAll解除所有等待线程的阻塞,然而解除阻塞的线程还必须等待Thread2线程调用完Exit才能拥有锁。

Thread1醒来时,进行下一次循环迭代,再次对条件进行检测,如过条件仍为false,继续调用Wait。如果条件满足,处理一些数据,最后调用Exit永久释放锁。

BlockingCollection类实现生产者/消费者模式
static void Main()
{
    //BlockingCollection类实现生产者/消费者模式
    var bl = new BlockingCollection<int>(new ConcurrentQueue<int>());

    //由一个线程池执行消费
    ThreadPool.QueueUserWorkItem(ConsumeItems, bl);

    //在集合中添加数据项
    for (int i = 0; i < 6; i++)
    {
        Console.WriteLine("production item : {0}", i);
        bl.Add(i);
    }

    //通知消费线程不会在集合中添加更多的item了
    bl.CompleteAdding();

    Console.ReadKey();
}
运行结果

生产与消费行为可能出现交错