@ltlovezh
        
        2020-02-15T09:29:43.000000Z
        字数 16568
        阅读 1513
    C++
C++ 11增加了标准线程库:std::thread,在语言级别上提供了线程支持,并且是跨平台的。在不同操作系统上,依赖于平台本身的线程库,例如Linux上,底层实现是pthread库。 
std::thread禁止了拷贝构造函数和拷贝赋值运算符,所以std::thread对象不能拷贝,但是可以移动。
一个最简单的实例:
void func(int arg1, int arg2) {cout << "arg1: " << arg1 << ", arg2: " << arg2 << endl;cout << "child thread id: " << std::this_thread::get_id() << endl;}int main() {// 创建并启动子线程std::thread thread(func, 10, 100);cout << "child thread id: " << thread.get_id() << endl;thread.join();cout << "main thread id: " << std::this_thread::get_id() << endl;cout << "main thread exit" << endl;return 0;}// 输出child thread id: 0x70000e841000arg1: 10, arg2: 100child thread id: 0x70000e841000main thread id: 0x10ef98dc0main thread exit
std::thread线程对象支持的操作如下所示:
std::thread实现和操作系统相关,因此该函数返回std::thread 底层实现的线程句柄,例如:在Posix标准平台下,就表示Pthread句柄pthread_t。std::this_thread表示当前线程,this_thread实际是一个namespace,支持如下操作:
下面通过sleep_for和sleep_until分别实现休眠10秒:
// 当前时间戳std::time_t timestamp = std::chrono::system_clock::to_time_t(system_clock::now());// sleep_until实现休眠10秒std::this_thread::sleep_until(system_clock::from_time_t(timestamp + 10));// sleep_for实现休眠10秒std::this_thread::sleep_for(std::chrono::seconds(10));
关于时间的操作可参考time.h文件
// 当前时间戳(秒)std::time_t now_timestamp = system_clock::to_time_t(system_clock::now());// struct tm结构体,包含了时、分、秒struct std::tm *now_tm = std::localtime(&now_timestamp);std::cout << "Current time: " << now_tm->tm_hour << ":" << now_tm->tm_min << ":"<< now_tm->tm_sec << '\n';// 未来一分钟++now_tm->tm_min;// 未来时间戳std::time_t future_timestamp = std::mktime(now_tm);std::cout << "Future time: " << now_tm->tm_hour << ":" << now_tm->tm_min << ":"<< now_tm->tm_sec << '\n';// time_point格式,可用于std::this_thread::sleep_untilstd::chrono::time_point<system_clock> x = system_clock::from_time_t(future_timestamp);// 输出Current time: 19:3:22Future time: 19:4:22
通过线程锁,可以实现临界区访问,同一个时刻只有一个线程可以访问临界区。
std::mutex表示互斥锁,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符)。不支持递归锁定,若有此需求,可使用std::recursive_mutex代替。
支持的操作:
recursive_mutex代替,它允许同一个线程多次锁定同一个recursive_mutex。recursive_mutex代替,它允许同一个线程多次锁定同一个recursive_mutex。lock和try_lock的差异主要是不能锁定mutex时表现不同,lock函数会一直阻塞调用线程,直到可以锁定mutex为止;而try_lock则不会阻塞调用线程,而是直接返回,并且返回值为false。
recursive_mutex在mutex的基础上,允许同一个线程对同一个recursive_mutex多次加锁,表示获得recursive_mutex的多层所有权,同时对recursive_mutex解锁时,也要调用相同次数的unlock,这样调用线程才能彻底释放对recursive_mutex的所有权,其他线程才能锁定recursive_mutex。
timed_mutex在mutex基础上,增加了两个成员函数try_lock_for和try_lock_until,表示等待一段时间,尝试获得锁。
recursive_timed_mutex代替,它允许同一个线程多次锁定同一个recursive_timed_mutex。try_lock_for一致,只不过参数是绝对时间。
std::timed_mutex timed_mutex;// 尝试2秒内获得锁if (timed_mutex.try_lock_for(std::chrono::seconds(2))) {// do some thing// 释放锁timed_mutex.unlock();}// 当前绝对时间戳std::time_t timestamp = std::chrono::system_clock::to_time_t(system_clock::now());// 尝试阻塞到未来的绝对时间获得锁if (timed_mutex.try_lock_until(system_clock::from_time_t(timestamp + 10))) {// do some thing// 释放锁timed_mutex.unlock();}
可重入的时间锁,同时具备timed_mutex和recursive_mutex的能力,不再赘述。
针对具备不同能力的锁,C++划分了三种层级:
lock_guard是一个管理mutex的对象,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符),除了缺省合成的函数外,没有其他成员函数。构造lock_guard时,mutex被调用线程锁定,销毁lock_guard时,mutex被调用线程解锁。通过这种方式,可以确保程序抛出异常时,也能正确地解锁mutex。这里的mutex可以是四种锁中的任意一种。 
lock_guard不会介入mutex生命周期,程序必须保证mutex的生命周期至少延长到持有它的lock_guard销毁为止。
简单来说,构造
lock_guard时,获得锁;析构lock_guard时,释放锁。
除了单参数构造函数,lock_guard还有一个包含两个参数的构造函数: 
lock_guard(mutex_type& __m, adopt_lock_t),表示创建lock_guard时,构造函数不会对mutex加锁,而是由外部程序保证mutex已经被加锁了。
下面是一个典型案例:
// 互斥锁std::mutex mtx;void print_even(int x) {if (x % 2 == 0) std::cout << x << " is even\n";else throw (std::logic_error("not even"));}void print_thread_id(int id) {try {// 使用lock_guard锁定mtx,保证即使是异常逻辑,lock_guard也可以在析构时解锁mtxstd::lock_guard<std::mutex> lck(mtx);print_even(id);}catch (std::logic_error &) {std::cout << "[exception caught]\n";}}int main() {std::thread threads[10];for (int i = 0; i < 10; ++i)threads[i] = std::thread(print_thread_id, i + 1);for (auto &th : threads) th.join();return 0;}// 可能的输出[exception caught]6 is even4 is even[exception caught][exception caught]2 is even[exception caught]8 is even[exception caught]10 is even
unique_lock是一个管理mutex的对象,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符),在lock_guard基础上,增加了lock、try_lock、try_lock_for、try_lock_until和unlock等成员函数(这些函数的作用在上面👆已经介绍过了),更加灵活,但相应的性能会受一些影响。 
下面是源码中的类定义:
template <class _Mutex>class unique_lock{public:// 模板参数,各类锁typedef _Mutex mutex_type;private:// 锁mutex_type* __m_;// 是否已经获得锁bool __owns_;public:// 无参构造函数unique_lock() : __m_(nullptr), __owns_(false) {}// 单参数构造函数,与lock_guard一样,构造函数中加锁explicit unique_lock(mutex_type& __m): __m_(_VSTD::addressof(__m)), __owns_(true) {__m_->lock();}// 带defer_lock_t的构造函数(第二个参数可使用编译时常量std::defer_lock),构造函数中不加锁unique_lock(mutex_type& __m, defer_lock_t) _NOEXCEPT: __m_(_VSTD::addressof(__m)), __owns_(false) {}// 带try_to_lock_t的构造函数(第二个参数可使用编译时常量std::try_to_lock),构造函数中尝试加锁unique_lock(mutex_type& __m, try_to_lock_t): __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock()) {}// 带adopt_lock_t的构造函数(第二个参数可使用编译时常量std::adopt_lock),与lock_guard一样,构造函数中不加锁,而是默认外部程序已经加锁了unique_lock(mutex_type& __m, adopt_lock_t): __m_(_VSTD::addressof(__m)), __owns_(true) {}template <class _Clock, class _Duration>// 构造函数中通过try_lock_until尝试加锁unique_lock(mutex_type& __m, const chrono::time_point<_Clock, _Duration>& __t): __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock_until(__t)) {}template <class _Rep, class _Period>// 构造函数中通过try_lock_for尝试加锁unique_lock(mutex_type& __m, const chrono::duration<_Rep, _Period>& __d): __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock_for(__d)) {}// 析构时,若加锁了,则释放锁~unique_lock(){if (__owns_)__m_->unlock();}private:// 相当于删除了拷贝构造函数和拷贝赋值操作符unique_lock(unique_lock const&); // = delete;unique_lock& operator=(unique_lock const&); // = delete;public:#ifndef _LIBCPP_CXX03_LANG// 移动构造函数unique_lock(unique_lock&& __u) _NOEXCEPT: __m_(__u.__m_), __owns_(__u.__owns_){__u.__m_ = nullptr; __u.__owns_ = false;}// 移动赋值操作符unique_lock& operator=(unique_lock&& __u) _NOEXCEPT{if (__owns_)__m_->unlock();__m_ = __u.__m_;__owns_ = __u.__owns_;__u.__m_ = nullptr;__u.__owns_ = false;return *this;}#endif // _LIBCPP_CXX03_LANG// 主动加锁void lock();// 尝试加锁bool try_lock();template <class _Rep, class _Period>bool try_lock_for(const chrono::duration<_Rep, _Period>& __d);template <class _Clock, class _Duration>bool try_lock_until(const chrono::time_point<_Clock, _Duration>& __t);// 释放锁void unlock();// swap实现void swap(unique_lock& __u) _NOEXCEPT{_VSTD::swap(__m_, __u.__m_);_VSTD::swap(__owns_, __u.__owns_);}// 返回持有的mutex,但是该函数不会解锁mutexmutex_type* release() _NOEXCEPT{mutex_type* __m = __m_;__m_ = nullptr;__owns_ = false;return __m;}// 判断是否已经加锁了bool owns_lock() const _NOEXCEPT {return __owns_;}// 重载了函数调用符, 判断是否已经加锁了operator bool () const _NOEXCEPT {return __owns_;}// 获得持有的mutexmutex_type* mutex() const _NOEXCEPT {return __m_;}};
unique_lock的构造函数,实现了不同的加锁策略,具体可见上面的源码和注释。
除非必要,优先使用更高效的lock_guard。
call_once是全局函数模板,原型如下所示:
template <class Fn, class... Args>void call_once (once_flag& flag, Fn&& fn, Args&&... args);
call_once使用参数args调用fn函数,除非另一个线程已经(或者正在)使用相同的once_flag调用call_once。
第一个使用相同once_flag调用call_once的线程,会执行fn函数,同时使其他使用相同once_flag调用call_once的线程进入被动执行状态,即:其他线程不执行fn函数,但是会阻塞到第一个执行fn函数的线程结束。
如果通过call_once执行fn函数的线程抛出了异常,并且存在被动执行的线程,则会从其中选择一个线程使其执行fn函数。
如果已经有线程执行完了
call_once,即fn函数返回了,那么当前所有被动执行的线程和将来对call_once的调用(使用相同once_flag)也会立即返回,不会再次执行fn函数。
即多线程环境下,相同once_flag的call_once调用,只会执行一次fn函数。
下面看一个实际案例:
int winner;void set_winner(int x) {winner = x;std::cout << "set_winner, x = " << x << std::endl;}// 多个线程共用一个once_flagstd::once_flag winner_flag;// 线程函数void wait_1000ms(int id) {// 循环1000次,每次休眠1msfor (int i = 0; i < 1000; ++i)std::this_thread::sleep_for(std::chrono::milliseconds(1));// 不同线程通过同一个once_flag,调用call_once,最终只会有一个线程执行对应的set_winner函数std::call_once(winner_flag, set_winner, id);}int main() {std::thread threads[10];// 10个线程for (int i = 0; i < 10; ++i)threads[i] = std::thread(wait_1000ms, i + 1);std::cout << "waiting for the first among 10 threads to count 1000 ms...\n";for (auto &th : threads) th.join();std::cout << "winner thread: " << winner << '\n';return 0;}// 可能的输出waiting for the first among 10 threads to count 1000 ms...set_winner, x = 8winner thread: 8
可见,虽然运行了10个线程,但是最终只有一个线程执行了set_winner函数。
lock是全局函数模板,原型如下所示:
template <class Mutex1, class Mutex2, class... Mutexes>void lock (Mutex1& a, Mutex2& b, Mutexes&... cde);
函数会锁定所有的mutex,必要时阻塞调用线程。
lock函数以不确定的顺序调用所有mutex的成员函数:lock、try_lock和unlock,以确保函数返回时,所有mutex都被锁定了(而不会产生任何死锁)。
如果lock函数不能锁定所有mutex(例如:其中一个调用抛出了异常),那么在函数失败之前,首先会解锁它成功锁定的所有mutex。
下面👇是同时锁定foo和bar的案例:
std::mutex foo, bar;void task_a() {// 若使用foo.lock(); bar.lock(); 而不是std::lock,那么有可能task_a锁定了foo,而task_b锁定了bar,从而导致死锁// foo.lock(); bar.lock();std::lock(foo, bar);std::cout << "task a\n";foo.unlock();bar.unlock();}void task_b() {// 若使用bar.lock(); foo.lock(); 而不是std::lock,那么有可能task_a锁定了foo,而task_b锁定了bar,从而导致死锁// bar.lock(); foo.lock();std::lock(bar, foo);std::cout << "task b\n";bar.unlock();foo.unlock();}int main() {std::thread th1(task_a);std::thread th2(task_b);th1.join();th2.join();return 0;}
try_lock是全局函数模板,原型如下所示:
// int返回值:若成功锁定了所有mutex,则返回-1;否则返回加锁失败的mutex的索引值template <class Mutex1, class Mutex2, class... Mutexes>int try_lock (Mutex1& a, Mutex2& b, Mutexes&... cde);
函数尝试通过mutex.try_lock锁定所有的mutex。
try_lock函数通过mutex.try_lock成员函数依次为参数中的mutex加锁(首先是a,然后是b,最后是cde),直到所有调用都成功,或者任意一个调用失败(即mutex.try_lock返回false或抛出异常)。
如果try_lock函数由于某个mutex加锁失败而返回,则会解锁所有先前加锁成功的mutex,并且返回那个加锁失败的mutex的索引值。
std::mutex foo, bar;void task_a() {foo.lock();std::cout << "task a\n";bar.lock();// ...foo.unlock();bar.unlock();}void task_b() {int x = try_lock(bar, foo);if (x == -1) {std::cout << "task b\n";// ...bar.unlock();foo.unlock();} else {std::cout << "[task b failed: mutex " << (x ? "foo" : "bar") << " locked]\n";}}int main() {std::thread th1(task_a);std::thread th2(task_b);th1.join();th2.join();return 0;}
通过线程锁只能实现临界区访问,要实现线程之间的同步,需要借助条件变量。
condition_variable是一个同步原语,能够阻塞调用线程,直到其他线程通知恢复为止,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符)。 
当调用condition_variable任意wait函数时,将使用unique_lock(通过mutex)锁定线程。该线程将一直处于阻塞状态,直到另一个线程调用同一个condition_variable任意notify函数唤醒为止。
condition_variable对象总是使用std::unique_lock<mutex>实现线程阻塞。
condition_variable可用的wait函数:
wait有两个重载版本,如下所示:
void wait (unique_lock<mutex>& lck);template <class Predicate>void wait (unique_lock<mutex>& lck, Predicate pred);
表示阻塞调用线程(调用线程调用wait之前必须已经锁定了unique_lock持有的mutex),直到被其他线程唤醒。 
该函数阻塞调用线程的时刻,会自动调用unique_lock.unlock解锁mutex,以允许其他线程加锁同一个mutex继续运行。 
一旦被其他线程唤醒,该函数会解除阻塞状态,并且调用unique_lock.lock重新加锁(可能会再次阻塞调用线程),让unique_lock恢复到wait函数被调用时的状态。
通常情况下,其他线程调用condition_variable的notify_one或者notify_all成员函数,来唤醒被阻塞的线程。但是,某些实现可能会在不调用任何notify函数的情况下产生虚假唤醒。因此,使用该函数的程序应该确保其恢复条件得到了满足,所以一般在循环结构中调用wait函数,如下所示,这样即使被虚假唤醒,也会因为条件不满足,再次进入阻塞状态。
while(条件不满足){condition_variable.wait}
包含_Predicate __pred参数的重载版本中,_Predicate是函数模板的参数,表示返回布尔值的函数。如果__pred返回false,则函数会一直阻塞,只有当它返回true时,notify才能唤醒线程。__pred会一直被调用,直到它返回true,非常适合处理虚假唤醒问题,如下所示:
template <class _Predicate>voidcondition_variable::wait(unique_lock<mutex>& __lk, _Predicate __pred){// __pred()返回false,则一直调用wait阻塞while (!__pred())wait(__lk);}
整体来看,wait类函数有三个注意点:
这样,wait类函数在被调用前和被唤醒返回后,可以确保是同一个线程状态。
下面👇看一个案例:
std::mutex mtx;std::condition_variable cv;int cargo = 0;bool shipment_available() { return cargo != 0; }void consume(int n) {for (int i = 0; i < n; ++i) {std::unique_lock<std::mutex> lck(mtx);cv.wait(lck, shipment_available);// consume:std::cout << cargo << '\n';cargo = 0;}}int main() {std::thread consumer_thread(consume, 10);// produce 10 items when needed:for (int i = 0; i < 10; ++i) {while (shipment_available()) std::this_thread::yield();std::unique_lock<std::mutex> lck(mtx);cargo = i + 1;cv.notify_one();}consumer_thread.join();return 0;}// 输出12345678910
子线程因为cargo等于0,会进入阻塞状态,然后主线程修改cargo,并且唤醒子线程。
wait_for有两个重载版本,如下所示:
enum class cv_status{no_timeout,timeout}// 如果因为时间到了停止阻塞,则返回timeout,否则返回no_timeoutcv_status wait_for(unique_lock<mutex>& __lk, const chrono::duration<_Rep, _Period>& __d);// 返回pred(),而不管是否触发了超时(尽管只有触发超时时,才能为false)。bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred);
wait_for函数在wait基础上,增加了阻塞持续时间的能力,所以有两种方式被唤醒:
👇下面看一个实际案例:
std::condition_variable cv;int value;void read_value() {std::cin >> value;cv.notify_one();}int main() {std::cout << "Please, enter an integer (I'll be printing dots): \n";std::thread th(read_value);std::mutex mtx;std::unique_lock<std::mutex> lck(mtx);while (cv.wait_for(lck, std::chrono::seconds(1)) == std::cv_status::timeout){std::cout << '.' << std::endl;}std::cout << "You entered: " << value << '\n';th.join();return 0;}
主线程每次阻塞1秒,若wait_for是以超时结束,则打印.,并且再次进入阻塞状态,直到被子线程主动唤醒退出while循环,结束主线程。
wait_until有两个重载版本,如下所示:
enum class cv_status{no_timeout,timeout}// 如果因为时间到了停止阻塞,则返回timeout,否则返回no_timeoutcv_status wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time);// 返回pred(),而不管是否触发了超时(尽管只有触发超时时,才能为false)。bool wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred);
wait_until函数在wait基础上,增加了阻塞到绝对时间的能力,所以有两种方式被唤醒:
解除当前正在等待指定condition_variable的所有线程的阻塞状态。如果没有线程在等待,函数将什么也不做。
唤醒所有等待的线程
解除当前正在等待指定condition_variable的所有线程中任意一个线程的阻塞状态。如果没有线程在等待,函数将什么也不做。
随机唤醒一个等待的线程
并不强制,可以根据具体情况,决定是否需要加锁
通知线程调用notify_all或者notify_one时,不需要提前加锁(与等待线程锁定的同一个mutex)。实际上,先加锁后通知是一种悲观做法,因为被通知的等待线程会立即再次阻塞,等待通知线程释放锁。 
然而,一些实现(尤其是pthreads)认识到这种情况,并通过在notify中将等待线程从condition_variable队列直接转移到mutex队列,而不唤醒它,从而避免这种“匆忙等待”的场景。
但是若需要精确的事件调度,那么先加锁后通知是有必要的,例如:等待线程将在满足条件后直接退出程序,这将导致通知线程的condition_variable被销毁,为了不让等待线程立即获得锁,那么在加锁状态下进行通知可能是有必要的。
condition_variable的wait/wait_for/wait_until函数只能以unique_lock作为参数,但是condition_variable_any的wait/wait_for/wait_until函数可以以任何Lockable类型的锁作为参数。除此之外,两者的能力完全相同。看下源码就知道:condition_variable_any内部也是通过condition_variable和unique_lock<mutex>实现的。 
下面使用condition_variable_any改造上面condition_variable.wait处的案例。
std::mutex mtx;std::condition_variable_any cv;int cargo = 0;bool shipment_available() { return cargo != 0; }void consume(int n) {for (int i = 0; i < n; ++i) {mtx.lock();cv.wait(mtx, shipment_available);// consume:std::cout << cargo << '\n';cargo = 0;mtx.unlock();}}int main() {std::thread consumer_thread(consume, 10);// produce 10 items when needed:for (int i = 0; i < 10; ++i) {while (shipment_available()) std::this_thread::yield();mtx.lock();cargo = i + 1;cv.notify_one();mtx.unlock();}consumer_thread.join();return 0;}// 输出12345678910
子线程因为cargo等于0,会进入阻塞状态,然后主线程修改cargo,并且唤醒子线程。 
只不过这里wait函数的参数由unique_lock<mutex>变为了mutex。
函数原型:
void notify_all_at_thread_exit (condition_variable& cond, unique_lock<mutex> lck);
当调用线程退出时,所有等待condition_variable的线程都会被唤醒。
使用该函数前,调用线程必须已经锁定了unique_lock的mutex。然后,该函数会先解锁unique_lock的mutex,然后唤醒其他线程,类似于下面的流程:
unique_lock.unlock();condition_variable.notify_all();
下面👇看一个案例:
std::mutex mtx;std::condition_variable cv;bool ready = false;void print_id(int id) {std::unique_lock<std::mutex> lck(mtx);while (!ready) cv.wait(lck);// ...std::cout << "thread " << id << '\n';}void go() {std::unique_lock<std::mutex> lck(mtx);// unique_lock禁止了拷贝构造函数和拷贝赋值操作符,所以只能使用移动构造函数和移动赋值操作符,这里借助移动语义,使用移动构造函数。std::notify_all_at_thread_exit(cv, std::move(lck));ready = true;}int main() {std::thread threads[10];// spawn 10 threads:for (int i = 0; i < 10; ++i)threads[i] = std::thread(print_id, i);std::cout << "10 threads ready to race...\n";std::thread(go).detach(); // go!for (auto &th : threads) th.join();return 0;}
10个子线程,阻塞(wait)在同一个mutex,go线程退出时,唤醒了所有子线程,然后10个子线程竞争锁,获得执行权。 
因为unique_lock禁止了拷贝构造函数和拷贝赋值操作符,所以只能使用移动构造函数和移动赋值操作符,所以上面借助移动语义,使用了unique_lock的移动构造函数。