0%

以std::mutex为基础,探索线程间共享数据的方法

写在前面

多线程编程的优势之一在于线程间共享数据的开销很小。

“灵活地在线程间共享数据,最大化地利用并发处理优势“,听起来很美好,实际上处处是坑。

  • 多个线程对同一个数据结构的读写操作,如果处理不好,就会出现“脏读”、“幻读”等bug(借用数据库中的概念);
  • 使用锁机制来保护共享的数据,如果多个线程分别持有不同的锁,却互相请求对方持有的锁,就会出现“死锁”bug(极难复现调试);
  • 锁的粒度设计太大,就无法充分利用多线程并发的速度优势;
  • 锁的粒度设计太小,出现bug的可能性也随之增加,逼迫程序员付出大量心智成本;

C++ Concurrency in Action认为,使用多线程共享数据的坑主要有两个:

  • problematic race condition
  • deadlock

如何利用C++ STL和boost中的facilities来规避大坑、并且设计出良好的代码结构,是这里讨论的重点。

Race Condition

如果数据是read-only的,那么多线程操作不会带来任何问题,但如果可读可写,那么多线程读写的顺序就会带来一系列问题(毕竟,不加锁的情况下,没法控制多个并发线程执行顺序),这里就引入了“线程竞态”(race condition)的概念。

什么是race condition,它会给多线程编程带来什么?

如果多个线程执行同一操作的结果依赖于多个线程的执行顺序(顺序不同,结果不同),那么这就叫race condition。

race condition并不一定是坏事,多个用户线程的竞争用在抢票系统、抢红包系统上就没有问题。真正有问题的是problematic race condition。C++标准中定义的data race就是一种problematic race condition,它表示并发地对同一个对象进行修改,这是一个undefined behavior。

problematic race condition常常发生在一个操作要修改多个数据的情形下。例如,在删除双向链表中的一个元素时,我们需要对三个链表元素(节点a、待删除节点b、节点c)进行操作,那么在删除的过程中,

  1. 必然会先破坏原有数据结构(节点a的后指针指向节点c,但节点c的前指针还指向待删除的节点b),
  2. 然后再恢复数据结构(节点a、c相连,但节点b的前指针和后指针还没变),
  3. 最后进行删除操作(delete 节点b),

在这一过程中,其他线程可能在删除操作尚未结束时读取到错误的链表数据。

如何防止problematic race condition

C++ Concurrency in Action介绍了三种防止problematic race condition的方法:

  1. 在数据结构外包裹一层保护机制(protection mechanism)
  2. 修改数据结构的接口,让修改操作原子化(indivisible)
  3. 把修改操作当做“事务”(transaction)来处理,实现all-or-nothing的效果

利用 std::mutex 的相关机制,可以灵活地选用方法1或方法2,来设计出线程安全的数据结构。

线程安全的list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value) {
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}

bool list_contains(int value_to_find) {
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(), some_list.end(), value_to_find)
!= some_list.end();
}

在这个例子中,被保护的数据结构是 some_listsome_mutex 被用来标识临界区代码。

如果某个线程对 some_mutex 上锁,那么其他线程只能等待这个线程解锁,否则无法执行 some_mutex 对应的临界区代码。这样就起到了保护 some_list 内数据不被同时读写的效果。

std::lock_guard<> 对象充分利用了RAII的特性,对象初始化后就上锁,对象被析构时自动解锁,不需要程序员来手动lock和unlock。这是一种好的编程习惯,尽可能地解放程序员,把mutex上锁的周期与另一个对象的生命周期绑定。

因为C++语言的灵活性,std::mutex 的锁机制并不是一劳永逸的,

  • 如果被保护的数据被指针传递、引用传递到临界区以外后,就不再受到保护,
  • 如果临界区代码需要调用用户自定义的函数(恶意地取出数据地址),那么 std::mutex 也无法起到保护作用。

线程安全的stack

在讨论线程安全的stack前,有必要回顾一下 std::stack 在设计时避开的坑:为什么 .pop() 操作不返回栈顶元素,而需要让用户先调用 .top() 取出栈顶元素,再 .pop() 出栈?

为了节约篇幅,这里不作讨论,直接放上结论:

  • 如果 .pop() 操作允许返回栈顶元素的值,那么必然要把该元素的值copy出来,
  • 那么,auto a = s.pop() 操作实际上分成了两步:
    • 修改 s 的数据结构,去掉栈顶元素,
    • 调用栈顶元素的 copy assignment operator ,给 a 赋值,
  • 大坑在第二步:如果给 a 赋值的操作 抛出异常 ,那么就会出现 栈顶元素已经抛出,但是无法返回 的情形,那么相当于我们白白丢失了栈顶元素(既不在栈里,也不在栈外),而且没有人知道它的值是啥,这个错误甚至是无法补救的,
  • 为了防止这个大坑,标准库的设计者把出栈操作设计成两步,让用户先读取,后出栈,

因此,在 std::stack 的基础上设计线程安全的栈,就是要给 .top() + .pop() 套一层保护机制,既要保护多线程读取的安全性、又要考虑拷贝操作的脆弱性。

比较直观的方法是逼迫用户只使用copy/move操作不会抛出异常的类,这种方法局限性比较大。以下两种方法更加自然一些。

方法一:返回一个 std::shared_ptr<>

1
2
3
4
5
6
7
8
9
10
std::mutex some_mutex;
std::stack<T> data;

std::shared_ptr<T> pop() {
std::lock_guard<std::mutex> guard(some_mutex);
if (data.empty()) throw empty_stack();
std::shared ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}

这种方法相当于避开了“拷贝-销毁”的流程,始终只保留一个对象。

方法二:引用传递

1
2
3
4
5
6
void pop(T& value) {
std::lock_guard<std::mutex> guard(some_mutex);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}

引用传递的好处是如果copy过程抛出了异常,那么stack就不会被修改。坏处是用户需要提前构造好一个空对象。

Dead Lock

死锁是一种非常有趣的bug,多个线程在持有锁的同时互相请求对方持有的锁,程序就卡死在这里。

这个概念第一次在操作系统中学习到,当时探讨了避免死锁、检测死锁、消除死锁的方法,但是在多线程编程中,最好还是从代码设计上根本性地杜绝死锁的产生,毕竟死锁的debug相当相当困难。

如何防止代码产生死锁?

死锁一般产生于一个操作需要对多个mutex上锁的场景。C++ Concurrency in Action给出了四条避免死锁的编程指南:

  1. 不要给已经上锁的部分再加锁,
  2. 上锁后避免调用用户自定义的函数,
  3. 如果需要给多个mutex上锁,那就要保证每次操作时加锁的顺序一致,
  4. 将代码按层次切分,每个层次加不同的锁,确保一个固定的顺序(例如,从高层次锁->低层次锁,否则报错),

避免死锁需要类的设计者和类的使用者严格地自律、遵守规则。

以下探讨如何利用C++ STL和Boost的facilities来设计无死锁的代码。

线程安全的swap v1.0

1
2
3
4
5
6
7
friend void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::lock(lhs.m, rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_a(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail);
}

std::lock()的上锁操作是原子化的,能同时对多个mutex上锁。

std::adopt_lock 作为参数传入 std::lock_guard<> ,就是告诉 lock_alock_b 只需要获得当前mutex的拥有权,不要再给mutex上锁。

如果 std::lock() 对任何一个mutex上锁失败,那么已经上锁的部分会被释放,实现all-or-nothing的效果。

层次锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(5000);

int do_low_level_stuff();

int low_level_func() {
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}

void do_high_level_stuff(int some_param);

void high_level_func() {
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
do_high_level_stuff(low_level_func());
}

把代码分层次,用不同的mutex来控制不同层次的锁。

当低层次mutex被锁时,高层次mutex不能再上锁,否则在runtime报错。

10000,5000指的是layer number(第几层),数字越大越先上锁。

这个 hierarchical_mutex 是自定义的,实现代码比较冗长,这里不展开了。实现的核心是三个变量:

  • 常量hierarchy_value保存本mutex的层号
  • thread_local static变量保存本线程当前运行的层号
  • 变量previous_hierarchy_value储存本线程之前的层号

线程安全的swap v2.0

1
2
3
4
5
6
7
friend void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
std::lock(lock_a, lock_b);
swap(lhs.some_detail, rhs.some_detail);
}

std::unique_lock 可以不拥有mutex(传入 std::defer_lock 参数),只在需要的时候请求对指定的mutex上锁。

如果 std::unique_lock 拥有mutex,那么析构时unlock,否则,析构时一定不unlock。

std::unique_lock 对mutex的拥有权可以用过 .owns_lock() 来查询。

std::unique_lock 是典型的可movable不可copyable的类,可以灵活使用它的move semantics来转移mutex的所有权,范例如下:

1
2
3
4
5
6
7
8
9
10
11
std::unique_lock<std::mutex> get_lock() {
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}

void process_data() {
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}

std::unique_lock 支持灵活地lock与unlock操作,例如:

1
2
3
4
5
6
7
8
void get_and_process_data() {
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process = get_next_data_chunk();
my_lock.unlock();
result_type result = process(data_to_process);
my_lock.lock();
write_result(data_to_process, result);
}

选择合适的锁粒度

锁的粒度取决于两个因素:

  1. 上锁的时间长短
  2. 对哪些操作上锁

一般来说,最佳的上锁时长 = 完成指定操作的最小时长。

另外,应该尽可能避免在已经上锁的情况下进行文件IO或者请求另一个锁(耗时过长);上锁时长必须覆盖整个操作过程,否则就会产生race condition。

其他保护数据的机制

线程安全的对象初始化

有些对象的初始化操作开销很大,需要耗费很长时间,比如数据库的connection初始化。针对这类对象,我们可以把它们设置成lazy initialization的,不需要的时候(离线访问本地缓存)只要维护一个空的 std::shared_ptr<> 指针,需要的时候(连接远程数据库同步)再给它 new 一个新的对象。

但在多线程编程中,麻烦就来了:

  • 对象可能还没初始化、也可能正在初始化过程中、也可能刚刚初始化完毕,而其他线程并不知道,
  • 如果多个线程对同一个指针执行 new 操作,就可能出现 data race (undefined behavior),
  • 如果要把“对象是否已经被初始化or正在被初始化”这个flag信息在多个线程之间同步,就要引入一个线程安全的bool变量,

C++ 标准考虑到了这一麻烦之处,并提供了 std::once_flagstd::call_once 来实现线程安全的对象初始化。

1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;

void init_resource() {
resource_ptr.reset(new some_resource);
}

void foo() {
std::call_once(resource_flag, init_resource);
resource_ptr->do_something();
}

线程安全的局部静态变量初始化

局部的static类型变量在C/C++语言中是很有意思的存在,当程序第一次执行到变量声明代码时,该变量即被创建,此后一直不销毁。

但在多线程编程中,这一特性也引来一个棘手之处:如果多个线程并发执行,怎么保证局部static类型变量只被初始化一次?

许多古老的C++编译器在这一点上都束手无策,幸好,C++11强制保证了local static variable的初始化是线程安全的 ,当第一个线程开始初始化时,其他线程自动hang住等待,并且保证只会初始化一次。

共享锁和排它锁

对象的读和写频率并不一定是相等的,针对读多写少的对象,我们完全可以允许多个线程对它进行并发地“读“,而只允许一个线程对它进行“写”(此时不允许其他线程读or写)。不难看出,这种情况下读写操作的权重是不一样的,我们需要用两种不同的锁来限制并发读写的权限。

这里就引入了“共享锁”(shared lock)和“排它锁”(exclusive lock)的概念(数据库中第一次学到):

  • 如果任何线程持有共享锁,其他线程对排它锁的请求被阻塞,直到所有的共享锁被释放,
  • 如果任何线程有排它锁,其他线程就不能再请求排它锁or共享锁,直到排它锁被释放,

C++ STL暂时还没有共享锁的实现,但可以使用boost库中的 boost::shared_lock<boost::shared_mutex>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>

class dns_entry;

class dns_cache {
std::map<std::string, dns_entry> entries;
mutable boost::shared_mutex entry_mutex;

public:
dns_entry find_entry(std::string const &domain) const {
boost::shared_lock<boost::shared_mutex> lk(entry_mutex);
std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain);
return (it == entries.end()) ? dns_entry(): it->second;
}

void update_or_add_entry(std::string const &domain, dns_entry const &dns_details) {
std::lock_guard<boost::shared_mutex> lk(entry_mutex);
entries[domain] = dns_details;
}
};

在以上代码中,

  • boost::shared_lock<boost::shared_mutex> 实现了对共享锁的加锁,
  • std::lock_guard<boost::shared_mutex> 实现了对排它锁的加锁。

可重入的mutex

C++ STL提供了 std::recursive_mutex ,支持一个线程对它上n次锁,但该线程必须释放n次锁后,其他线程才能对它上锁。

尽可能不要使用可重入的mutex,这是一种糟糕的设计。

总结与思考:关于lock_guard和unique_lock

为了厘清一些叙述上的模糊,有必要再把一些概念拎出来说清楚,以下是我自己的理解。

为什么要使用它们?

mutex提供了lock和unlock的操作,程序员的确可以直接对mutex加锁,但是在复杂代码中,unlock操作需要考虑很多情况(异常处理等),使用lock_guard和unique_lock将unlock的控制权交给了析构函数,解放了程序员。

它们如何与某个mutex绑定?

lock_guard和unique_lock在初始化时,就与参数中传入的mutex相互绑定(associated)。

它们如何获得某个mutex的所有权?

  • 对lock_guard来说,在初始化时就获得了mutex的所有权;

  • 对unique_lock来说,

    • 如果初始化时不传入其他参数,即获得mutex的所有权,
    • 如果初始化时传入 std::defer_lock ,那么只相互绑定,不获取所有权,

它们如何对绑定的mutex加锁?

  • 对lock_guard来说,有两种加锁方式,
    • 先使用 std::lock() 或其他方式加锁,然后初始化lock_guard对象,并传入额外参数 std::adopt_lock 表示只获取所有权,不重复加锁,
    • 在初始化时不传入额外参数,直接绑定+获取所有权+加锁,
  • 对unique_lock来说,也有两种加锁方式,
    • 在初始化时不传入额外参数,直接绑定+获取所有权+加锁,
    • 在初始化时传入额外参数 std::defer_lock ,表示只相互绑定,不获取所有权,不加锁,之后再使用 std::lock() 或者 .lock() 方法进行加锁(加锁的同时也获得了所有权),

它们如何对绑定的mutex解锁?

  • 对lock_guard来说,析构时自动对拥有的mutex解锁,
  • 对unique_lock来说,
    • 析构时,如果unique_lock拥有mutex的所有权,那么自动解锁,
    • 析构时,如果unique_lock不拥有mutex的所有权,那么不解锁,
    • 程序员可以手动对已经上锁的unique_lock调用 .unlock() ,调用完后解锁+丧失所有权。