• 6.3 基于锁设计更加复杂的数据结构
    • 6.3.1 编写一个使用锁的线程安全查询表
    • 6.3.2 编写一个使用锁的线程安全链表

    6.3 基于锁设计更加复杂的数据结构

    栈和队列都很简单:接口相对固定,并且应用于比较特殊的情况。并不是所有数据结构都像它们一样简单,大多数数据结构支持更加多样化的操作。这将增大并行的可能性,但是也让对数据保护变得更加困难,因为要考虑对所有能访问到的部分变多了。当为了并发访问对数据结构进行设计时,一系列原有的操作就变得越发需要重点处理。

    先来看看,在设计查询表时所遇到的一些问题。

    6.3.1 编写一个使用锁的线程安全查询表

    查询表或字典是一种类型的值(键值)和另一种类型的值进行关联(映射)的方式。一般情况下,这样的结构允许代码通过键值对相关的数据值进行查询。C++标准库中相关工具:std::map<>, std::multimap<>, std::unordered_map<>std::unordered_multimap<>

    查询表的使用方式与栈和队列不同。栈和队列上几乎每个操作都会对数据结构进行修改,不是添加一个元素,就是删除一个,而对于查询表来说,几乎不需要什么修改。清单3.13中有个例子,是一个简单的域名系统(DNS)缓存,其特点是相较于std::map<>削减了很多的接口。和队列和栈一样,标准容器的接口不适合多线程进行并发访问,因为这些接口都存在固有的条件竞争,所以有些接口需要砍掉或重新修订。

    并发访问时,std::map<>接口最大的问题在于——迭代器。虽然,多线程访问(或修改)容器时,可能会有提供安全访问的迭代器,但这就问题棘手之处。要想正确的处理迭代器,可能会碰到下面的问题:迭代器引用的元素被其他线程删除时,迭代器就会出问题。线程安全的查询表的第一次接口削减,需要绕过迭代器。std::map<>(以及标准库中其他相关容器)给定的接口对于迭代器的依赖很严重,其中有些接口需要先放在一边,先对一些简单接口进行设计。

    查询表的基本操作有:

    • 添加一对“键值-数据”

    • 修改指定键值所对应的数据

    • 删除一组值

    • 通过给定键值,获取对应数据

    容器也有一些操作是非常有用的,比如:查询容器是否为空,键值列表的完整快照和“键值-数据”的完整快照。

    如果坚持线程安全指导意见,例如:不要返回一个引用,并且用一个简单的互斥锁对每一个成员函数进行上锁,以确保每一个函数线程安全。最有可能的条件竞争在于,当一对“键值-数据”加入时;当两个线程都添加一个数据,那么肯定一先一后。一种方式是合并“添加”和“修改”操作,为一个成员函数,就像清单3.13对域名系统缓存所做的那样。

    从接口角度看,有一个问题很是有趣,那就是任意(if any)部分获取相关数据。一种选择是在键值没有对应值的时候进行返回时,允许用户提供一个“默认”值:

    1. mapped_type get_value(key_type const& key, mapped_type default_value);

    种情况下,当default_value没有明确的给出时,默认构造出的mapped_type实例将被使用。也可以扩展成返回一个std::pair<mapped_type, bool>来代替mapped_type实例,其中bool代表返回值是否是当前键对应的值。另一个选择是返回一个有指向数据的智能指针,当指针的值是NULL时,这个键值就没有对应的数据。

    当接口确定时,(假设没有接口间的条件竞争)就需要保证线程安全了,可以通过对每一个成员函数使用一个互斥量和一个简单的锁来保护底层数据。不过,当独立函数对数据结构进行读取和修改时,就会降低并发的可能性。一个选择是使用一个互斥量去面对多个读者线程或一个作者线程,如同在清单3.13中使用std::shared_mutex一样。虽然,这将提高并发访问,但是同时只有一个线程能对数据结构进行修改。理想很美好,现实很骨感?我们应该能做的更好!

    为细粒度锁设计一个映射结构

    对队列的讨论中(在6.2.3节),为了允许细粒度锁能正常工作,需要对数据结构的细节进行仔细的考虑,而非直接使用已知容器,例如std::map<>。列出三个常见关联容器的方式:

    • 二叉树,比如:红黑树

    • 有序数组

    • 哈希表

    二叉树的方式,不会对提高并发访问的能力;每一个查找或者修改操作都需要访问根节点,因此,根节点需要上锁。虽然,访问线程在向下移动时,这个锁可以进行释放,但相比横跨整个数据结构的单锁,并没有什么优势。

    有序数组是最坏的选择,因为你无法提前言明数组中哪段是有序的,所以你需要用一个锁将整个数组锁起来。

    那么就剩哈希表了。假设有固定数量的桶,每个桶都有一个键值(关键特性),以及散列函数。这就意味着你可以安全的对每个桶上锁。当再次使用互斥量(支持多读者单作者)时,就能将并发访问的可能性增加N倍,这里N是桶的数量。当然,缺点也是有的:对于键值的操作,需要有合适的函数。C++标准库提供std::hash<>模板,可以直接使用。对于特化的类型,比如int,以及通用库类型std::string,并且用户可以简单的对键值类型进行特化。如果去效仿标准无序容器,并且获取函数对象的类型作为哈希表的模板参数,用户可以选择是否特化std::hash<>的键值类型,或者提供一个独立的哈希函数。

    那么,让我们来看一些代码吧。怎样的实现才能完成一个线程安全的查询表?下面提供一种方式。

    清单6.11 线程安全的查询表

    1. template<typename Key,typename Value,typename Hash=std::hash<Key> >
    2. class threadsafe_lookup_table
    3. {
    4. private:
    5. class bucket_type
    6. {
    7. private:
    8. typedef std::pair<Key,Value> bucket_value;
    9. typedef std::list<bucket_value> bucket_data;
    10. typedef typename bucket_data::iterator bucket_iterator;
    11. bucket_data data;
    12. mutable std::shared_mutex mutex; // 1
    13. bucket_iterator find_entry_for(Key const& key) const // 2
    14. {
    15. return std::find_if(data.begin(),data.end(),
    16. [&](bucket_value const& item)
    17. {return item.first==key;});
    18. }
    19. public:
    20. Value value_for(Key const& key,Value const& default_value) const
    21. {
    22. std::shared_lock<std::shared_mutex> lock(mutex); // 3
    23. bucket_iterator const found_entry=find_entry_for(key);
    24. return (found_entry==data.end())?
    25. default_value:found_entry->second;
    26. }
    27. void add_or_update_mapping(Key const& key,Value const& value)
    28. {
    29. std::unique_lock<std::shared_mutex> lock(mutex); // 4
    30. bucket_iterator const found_entry=find_entry_for(key);
    31. if(found_entry==data.end())
    32. {
    33. data.push_back(bucket_value(key,value));
    34. }
    35. else
    36. {
    37. found_entry->second=value;
    38. }
    39. }
    40. void remove_mapping(Key const& key)
    41. {
    42. std::unique_lock<std::shared_mutex> lock(mutex); // 5
    43. bucket_iterator const found_entry=find_entry_for(key);
    44. if(found_entry!=data.end())
    45. {
    46. data.erase(found_entry);
    47. }
    48. }
    49. };
    50. std::vector<std::unique_ptr<bucket_type> > buckets; // 6
    51. Hash hasher;
    52. bucket_type& get_bucket(Key const& key) const // 7
    53. {
    54. std::size_t const bucket_index=hasher(key)%buckets.size();
    55. return *buckets[bucket_index];
    56. }
    57. public:
    58. typedef Key key_type;
    59. typedef Value mapped_type;
    60. typedef Hash hash_type;
    61. threadsafe_lookup_table(
    62. unsigned num_buckets=19,Hash const& hasher_=Hash()):
    63. buckets(num_buckets),hasher(hasher_)
    64. {
    65. for(unsigned i=0;i<num_buckets;++i)
    66. {
    67. buckets[i].reset(new bucket_type);
    68. }
    69. }
    70. threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
    71. threadsafe_lookup_table& operator=(
    72. threadsafe_lookup_table const& other)=delete;
    73. Value value_for(Key const& key,
    74. Value const& default_value=Value()) const
    75. {
    76. return get_bucket(key).value_for(key,default_value); // 8
    77. }
    78. void add_or_update_mapping(Key const& key,Value const& value)
    79. {
    80. get_bucket(key).add_or_update_mapping(key,value); // 9
    81. }
    82. void remove_mapping(Key const& key)
    83. {
    84. get_bucket(key).remove_mapping(key); // 10
    85. }
    86. };

    实现中使用了std::vector<std::unique_ptr<bucket_type>>⑥来保存桶,其允许在构造函数中指定构造桶的数量。默认为19个,其是一个任意的质数;哈希表在有质数个桶时,工作效率最高。每一个桶都会被一个std::shared_mutex①实例锁保护,来允许并发读取,或对每一个桶,只有一个线程对其进行修改。

    因为桶的数量是固定的,所以get_bucket()⑦可以无锁调用,⑧⑨⑩也都一样。并且对桶的互斥量上锁,要不就是共享(只读)所有权时③,要不就是在获取唯一(读/写)权时④⑤。这里的互斥量,可适用于每个成员函数。

    这三个函数都使用到了find_entry_for()成员函数②,用来确定数据是否在桶中。每一个桶都包含一个“键值-数据”的std::list<>列表,所以添加和删除数据就会很简单。

    从并发的角度考虑,所有成员都会被互斥锁保护,这样的实现是“异常安全”的吗?value_for是不能修改任何值的,所以其不会有问题;如果value_for抛出异常,也不会对影响任何数据结构。remove_mapping修改链表时,会调用erase,不过这能保证没有异常抛出,也是安全的。那么就剩add_or_update_mapping了,可能会在其两个if分支上抛出异常。push_back是异常安全的,如果有异常抛出,其也会将链表恢复成原来的状态,所以这个分支没有问题。唯一的问题就在赋值阶段,这将替换已有的数据;当复制阶段抛出异常,用于原依赖的始状态没有改变。不过,这不会影响数据结构的整体,以及用户提供类型的属性,所以可以放心的将问题交给用户处理。

    本节开始时,我提到查询表的一个可有可无(nice-to-have)的特性,会将选择当前状态的快照,例如:一个std::map<>。这要求锁住整个容器,保证拷贝副本的状态是可以索引的,这将锁住所有的桶。因为对于查询表的“普通”的操作,需要在同一时间获取桶上的锁,而这个操作将要求查询表将所有桶都锁住。因此,只要每次以相同的顺序进行上锁(例如,递增桶的索引值),就不会产生死锁。实现如下所示:

    清单6.12 获取整个threadsafe_lookup_table作为一个std::map<>

    1. std::map<Key,Value> threadsafe_lookup_table::get_map() const
    2. {
    3. std::vector<std::unique_lock<std::shared_mutex> > locks;
    4. for(unsigned i=0;i<buckets.size();++i)
    5. {
    6. locks.push_back(
    7. std::unique_lock<std::shared_mutex>(buckets[i].mutex));
    8. }
    9. std::map<Key,Value> res;
    10. for(unsigned i=0;i<buckets.size();++i)
    11. {
    12. for(bucket_iterator it=buckets[i].data.begin();
    13. it!=buckets[i].data.end();
    14. ++it)
    15. {
    16. res.insert(*it);
    17. }
    18. }
    19. return res;
    20. }

    清单6.11中的查询表实现,就增大的并发访问的能力,这个查询表作为一个整体,通过单独的操作,对每一个桶进行锁定,并且通过使用std::shared_mutex允许读者线程对每一个桶并发访问。如果细粒度锁和哈希表结合起来,会增加并发的可能性吗?

    下一节中,将使用到一个线程安全列表(支持迭代器)。

    6.3.2 编写一个使用锁的线程安全链表

    链表类型是数据结构中的一个基本类型,所以比较好修改成线程安全的,对么?不好说,这取决于要添加什么样的功能,并且需要提供迭代器的支持。为了简化基本数据类型的代码,我去掉了一些功能。迭代器的问题在于,STL类的迭代器需要持有容器内部属于的引用。当容器可被其他线程修改时,这个引用还是有效的;实际上就需要迭代器持有锁,对指定的结构中的部分进行上锁。在给定STL类迭代器的生命周期中,让其完全脱离容器的控制是很糟糕的做法。

    替代方案就是提供迭代函数,例如:将for_each作为容器本身的一部分。这就能让容器来对迭代的部分进行负责和锁定,不过这将违反第3章指导意见——对避免死锁建议。为了让for_each在任何情况下都有效,持有内部锁的时,必须调用用户提供的代码。不仅如此,需要传递一个对容器中元素的引用到用户代码中,就是让用户代码对容器中的元素进行操作。可以为了避免传递引用,而传出一个拷贝到用户代码中;不过当数据很大时,拷贝要付出的代价也很大。

    所以,可以将避免死锁的工作(因为用户提供的操作需要获取内部锁),还有避免对引用(不被锁保护)进行存储时的条件竞争交给用户去做。这样查询表就可以使用链表了,这样很安全,因为清楚这里的实现不会有任何问题。

    剩下的问题就是哪些操作需要列表所提供。如果愿在花点时间看一下清单6.11和6.12中的代码,会需要下面这些操作:

    • 向列表添加一个元素

    • 当某个条件满足时,从链表中删除某个元素

    • 当某个条件满足时,从链表中查找某个元素

    • 当某个条件满足时,更新链表中的某个元素

    • 将容器中链表中的每个元素,复制到另一个容器中

    提供了这些操作,链表才能为通用容器,这将帮助我们添加更多功能,比如:指定位置上插入元素,不过这对于查询表来说就没有必要了,所以就算是给读者们留的一个作业吧。

    使用细粒度锁最初的想法,是为了让链表每个节点都拥有一个互斥量。当链表很长时,就会使用有很多的互斥量!这样的好处是对于链表中每一个独立的部分,都能实现真实的并发:其真正感兴趣的是对持有的节点群进行上锁,并且在移动到下一个节点的时,对当前节点进行释放。下面的清单中将展示这样的一个链表实现。

    清单6.13 线程安全链表——支持迭代器

    1. template<typename T>
    2. class threadsafe_list
    3. {
    4. struct node // 1
    5. {
    6. std::mutex m;
    7. std::shared_ptr<T> data;
    8. std::unique_ptr<node> next;
    9. node(): // 2
    10. next()
    11. {}
    12. node(T const& value): // 3
    13. data(std::make_shared<T>(value))
    14. {}
    15. };
    16. node head;
    17. public:
    18. threadsafe_list()
    19. {}
    20. ~threadsafe_list()
    21. {
    22. remove_if([](node const&){return true;});
    23. }
    24. threadsafe_list(threadsafe_list const& other)=delete;
    25. threadsafe_list& operator=(threadsafe_list const& other)=delete;
    26. void push_front(T const& value)
    27. {
    28. std::unique_ptr<node> new_node(new node(value)); // 4
    29. std::lock_guard<std::mutex> lk(head.m);
    30. new_node->next=std::move(head.next); // 5
    31. head.next=std::move(new_node); // 6
    32. }
    33. template<typename Function>
    34. void for_each(Function f) // 7
    35. {
    36. node* current=&head;
    37. std::unique_lock<std::mutex> lk(head.m); // 8
    38. while(node* const next=current->next.get()) // 9
    39. {
    40. std::unique_lock<std::mutex> next_lk(next->m); // 10
    41. lk.unlock(); // 11
    42. f(*next->data); // 12
    43. current=next;
    44. lk=std::move(next_lk); // 13
    45. }
    46. }
    47. template<typename Predicate>
    48. std::shared_ptr<T> find_first_if(Predicate p) // 14
    49. {
    50. node* current=&head;
    51. std::unique_lock<std::mutex> lk(head.m);
    52. while(node* const next=current->next.get())
    53. {
    54. std::unique_lock<std::mutex> next_lk(next->m);
    55. lk.unlock();
    56. if(p(*next->data)) // 15
    57. {
    58. return next->data; // 16
    59. }
    60. current=next;
    61. lk=std::move(next_lk);
    62. }
    63. return std::shared_ptr<T>();
    64. }
    65. template<typename Predicate>
    66. void remove_if(Predicate p) // 17
    67. {
    68. node* current=&head;
    69. std::unique_lock<std::mutex> lk(head.m);
    70. while(node* const next=current->next.get())
    71. {
    72. std::unique_lock<std::mutex> next_lk(next->m);
    73. if(p(*next->data)) // 18
    74. {
    75. std::unique_ptr<node> old_next=std::move(current->next);
    76. current->next=std::move(next->next);
    77. next_lk.unlock();
    78. } // 20
    79. else
    80. {
    81. lk.unlock(); // 21
    82. current=next;
    83. lk=std::move(next_lk);
    84. }
    85. }
    86. }
    87. };

    清单6.13中的threadsafe_list<>是一个单链表,可从node的结构①中看出。一个默认构造的node作为链表的head,其next指针②指向的是NULL。新节点都通过push_front()函数添加;构造第一个新节点④,其将会在堆上分配内存③来对数据进行存储,同时将next指针置为NULL。然后,为了设置next的值⑤,需要获取head节点的互斥锁,也就是插入节点到列表的头部,让头节点的head.next指向这个新节点⑥。目前,还没有什么问题:只需要锁住一个互斥量,就能将新的数据添加进入链表,所以不存在死锁的问题。同样,(缓慢的)内存分配操作在锁的范围外,所以锁能保护需要更新的一对指针。那么,再来看一下迭代功能。

    首先,来看一下for_each()⑦。这个操作对队列中的每个元素执行Function(函数指针);大多数标准算法库中,都会通过传值方式来执行这个函数,要不就传入一个通用的函数,要不就传入一个有函数操作的类型对象。这种情况下,函数必须接受类型为T的值作为参数。链表中会有一个“手递手”的上锁过程。这个过程开始时,需要锁住head及节点⑧的互斥量。然后,安全的获取指向下一个节点的指针(使用get()获取,因为对这个指针没有所有权)。当指针不为NULL⑨,为了继续对数据进行处理,就需要对指向的节点进行上锁⑩。当锁住了那个节点,就可以对上一个节点进行释放了⑪,并调用指定函数⑫。当函数执行完成时,就可以更新当前指针所指向的节点(刚刚处理过的节点),并将所有权从next_lk移动移动到lk⑬。因为for_each传递的每个数据都是能被Function接受的,所以当需要的时,或需要拷贝到另一个容器的时,或其他情况时,都可以考虑使用这种方式更新每个元素。如果函数的行为没什么问题,这种方式是安全的,因为在获取节点互斥锁时,已经获取锁的节点正在被函数所处理。

    find_first_if()⑭和for_each()很相似;最大的区别在于find_first_if支持函数(谓词)在匹配的时候返回true,不匹配的时候返回false⑮。条件匹配时,只需要返回找到的数据⑯,而非继续查找。可以使用for_each()来做这件事,不过在找到之后,继续做查找就没意义了。

    remove_if()⑰就有些不同了,因为这个函数会改变链表;所以,就不能使用for_each()来实现这个功能。当函数(谓词)返回true⑱,对应元素将会移除,并且更新current->next⑲。当这些都做完,就可以释放next指向节点的锁。当std::unique_ptr<node>的移动超出链表范围⑳,这个节点将被删除。这种情况下,就不需要更新当前节点了,因为只需要修改next所指向的下一个节点就可以。当函数(谓词)返回false,移动的操作就和之前一样了(21)。

    那么,所有的互斥量中会有死锁或条件竞争吗?答案无疑是“否”,要看提供的函数(谓词)是否有良好的行为。迭代通常都使用一种方式,从head节点开始,并且在释放当前节点锁之前,将下一个节点的互斥量锁住,所以就不可能会有不同线程有不同的上锁顺序。唯一可能出现条件竞争的地方就在remove_if()⑳中删除已有节点的时候。因为,操作在解锁互斥量后进行(其导致的未定义行为,可对已上锁的互斥量进行破坏)。不过,可以确定这的确是安全的,因为现在还持有前一个节点(当前节点)的互斥锁,所以不会有新的线程尝试去获取正在删除节点的互斥锁。

    并发概率有多大呢?细粒度锁要比单锁的并发概率大很多,那我们已经获得了吗?是的,已经获得了:同一时间内,不同线程在不同节点上工作,无论是使用for_each()对每一个节点进行处理,还是使用find_first_if()对数据进行查找,或是使用remove_if()删除一些元素。不过,因为互斥量必须按顺序上锁,线程就不能交叉进行工作。当线程耗费大量的时间对一个特殊节点进行处理,其他线程就必须等待这个处理完成。完成后,其他线程才能到达这个节点。