This is one of those topics where I feel knowing the toolbox first is more important than application of those tools. Albeit, some application in an isolated context is essential.
Mutex
Eliminating races by eliminating concurrency on shared data. Now this is the key here, if there’s multiple units of shared data then you can have concurrency but if it’s all reading just one unit of shared data then we don’t get any benefit from the whole ordeal.
Mutex: lock a particular block of code (critical section) to only be executed by one thread at a time.
std::mutex in <mutex>
- Has
lock(),try_lock(),unlock()You don’t want to use this direcly as that’s against RAII principles. You don’t need to remember to unlock.
RAII Guards
There’s primarily three that you would want to use.
| Feature/Class | std::lock_guard | std::unique_lock | std::scoped_lock |
|---|---|---|---|
| C++ Standard | C++11 | C++11 | C++17 |
| Mutex Count | Single mutex | Single mutex | One or more mutexes (variadic template) |
| Ownership Transfer | No (neither copyable nor movable) | Yes (movable, but not copyable) | No (neither copyable nor movable) |
| Manual Lock/Unlock | No | Yes (lock(), unlock(), try_lock(), etc.) | No |
| Deferred Locking | No | Yes (std::defer_lock) | No |
| Deadlock Avoidance | No (for multiple mutexes, use std::lock) | No (for multiple mutexes, use std::lock with std::unique_locks in std::defer_lock mode) | Yes (built-in for multiple mutexes) |
| Primary Use Case | Simple, fixed-scope locking of a single mutex. | Flexible control over single mutex lifetime, required for std::condition_variable. | Safe, deadlock-free locking of multiple mutexes in a fixed scope. |
| Overhead | Low | Slightly higher than std::lock_guard (due to state tracking) | Similar to std::lock_guard for single mutex; optimized for multiple mutexes. |
A lock guard
std::mutex mtx;
{
std::lock_guard<std::mutex> lock(mtx); // this locks always
// if you pass in an already locked one, use `std::lock_guard<std::mutex> lock(mtx, std::adopt_lock);`
// critical section
} // mtx is automatically unlocked hereA unique lock
std::mutex mtx;
{
std::unique_lock<std::mutex> lock(mtx); // if locked manually, can use std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
// critical section
lock.unlock(); // can unlock manually if needed
// non-critical section
lock.lock(); // can lock again if needed
// critical section again
} // mtx is automatically unlocked hereA scoped lock. Locking is in th order of args.
std::mutex mtx1, mtx2;
{
std::scoped_lock lock(mtx1, mtx2); // locks both mtx1 and mtx2
// critical section
} // mtx1 and mtx2 are automatically unlocked hereSingle-writer, multiple-reader mutex
std::shared_mutex in <shared_mutex>
Idea being if it’s read heavy, you can allow multiple readers to read concurrently.
- A write blocks all reads ( as would any mutex )
- A read blocks all ( one here ) writes ( as would any mutex )
- A read does not block other reads ( this is the optimization )
Note the usage of a shared_lock for reading and a unique_lock for writing.
std::shared_mutex shared_mtx;
// read
{
std::shared_lock<std::shared_mutex> lock(shared_mtx);
// read critical section
}
// write
{
std::unique_lock<std::shared_mutex> lock(shared_mtx);
// write critical section
}To not use:
std::recursive_mutexin<mutex>as it’s often a bad design if you need it and can lead to harder non-intuitive errors.
Shared Initialization ( or double-checked locking problem )
Case: A get_conn function that returns the connection with possible initialization, this can be called by multiple threads simultaneously.
The name double-checked locking is due to the common ( and incorrect ) double check pattern.
std::mutex mtx;
int* shared_data = nullptr;
int* get_shared_data() {
if (shared_data == nullptr) { // first check
std::lock_guard<std::mutex> lock(mtx);
if (shared_data == nullptr) { // second check
shared_data = new int(42); // initialize shared data
}
}
return shared_data;
}The first check is to avoid the costly lock if the data is already initialized. The second check post locking is to avoid the check and act error.
The error here is due a memory race issue. Due to no memory ordering guarantees, the compiler/CPU can reorder leading to pointer being set before the data is completely initialized ( note that this doesn’t happen in the same thread but in cross thread scenarios, albeit unlikely and hence hard to debug ).
A futile attempt: using volatile to prevent reordering. Though this doesn’t prevent all reordering and is not a solution.
Multiple solutions exist now:
- atomics using
std::atomic(C++11) (convered later) std::call_once(C++11) andstd::once_flagstaticlocal variable initialization (C++11)
Way 2:
std::once_flag flag;
int* shared_data = nullptr;
int* get_shared_data() {
std::call_once(flag, []() {
shared_data = new int(42);
});
return shared_data;
}Way 3:
int* get_shared_data() {
static int* shared_data = new int(42); // static local variable, initialized once
return shared_data;
}static data is shared and is safe to initialize ina multithreaded context since C++11.
Events
Responding to an event from another thread is a common pattern.
A generic way is using std::condition_variable in <condition_variable> with a mutex ( and it’s related RAII locks ).
#include <condition_variable>
#include <mutex>
std::mutex mtx;
std::condition_variable cv;
void worker() {
{
std::lock_guard<std::mutex> lock(mtx);
// do some work
}
cv.notify_one();
}
void wait_for_event() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return ready; });
// Event has occurred
}Reading cv.wait( lock, predicate ): Take the lock, if the predicate is false, release it.
Think of wait as a more optimised version of
while (!pred()) {
lock.unlock();
// cv has smarter logic to wait, nothing here so busy waiting
lock.lock();
}The predicate is always checked in a state where the lock is held. The unique lock prior to the wait does the first locking.
One off events → std::future
There are three ways to go about using a future, depending on the control needed.
std::async ( hands off )
No control over the thread and value setting.
std::future<int> result = std::async(std::launch::async, [] {
return 42;
});
// or
auto fut = std::async(foo, 1, 2); // similar args as threads or any bind like callable
// Do other work while async_task runs
std::cout << "Result: " << result.get() << std::endl; // Blocks until async_task completes
The default is std::launch::async | std::launch::deferred, meaning implentation decides. Omitting the first policy arg uses the default.
std::packaged_task ( you manage the thread )
Packaged task is a callable wrapper around a callable. You make a task, get the future from it and calling the task sets the value in the future. Idea is that you then move the task to a different thread and call it there.
#include <future>
#include <iostream>
#include <thread>
int main() {
std::packaged_task<int()> task([] {
std::this_thread::sleep_for(std::chrono::seconds(2));
return 42;
});
std::future<int> result = task.get_future();
std::thread(std::move(task)).detach(); // even if you detach you can still get the result later
std::cout << "Moved on immediately\n";
std::cout << "Result: " << result.get() << std::endl; // Blocks until task completes
return 0;
}std::promise ( you manage the value set )
Along with moving off to the thread. In fact the thread bit is optional but then it isn’t very useful.
Similar to packaged_task, you make a promise, get a future from it, send it off to a thread. Except that since it’s not a callable, you need to write the callable that sets the value in the promise.
#include <future>
#include <iostream>
#include <thread>
int main() {
std::promise<int> promise;
std::future<int> result = promise.get_future();
std::thread([&promise]() {
std::this_thread::sleep_for(std::chrono::seconds(2));
promise.set_value(42); // Set the value in the promise
}).detach(); // Detach the thread
std::cout << "Moved on immediately\n";
std::cout << "Result: " << result.get() << std::endl; // Blocks until value is set
return 0;
}Can you call
get()multiple times? No, this throws. How to check gettable? Usefuture.wait_for(std::chrono::seconds(0)) == std::future_status::readyto check if the value is set. Block is for 0 seconds though can be more. This is one way of doing timeouts.
Exceptions in futures
For a promise, exceptions is set using set_exception(). Both of the others, throw when you call get().
What of the future if the promise is destroyed without setting or packaged is never called & destroyed.
Destructor stores the exception std::future_error as the exception on the future.
Sharing futures across threads
std::shared_future in <future>
The member functions on a future are not thread safe, you don’t want to use the same future. But a future is also only moveable so can only do reference.
Idea is to use a shared future which can be copied and each thread then uses it’s own copy of the shared future.
// f is std::future
auto shared_fut = std::shared_future<int>(std::move(f)); // move the future to shared future
// if not valid anymore
f.valid(); // returns falseTimeouts
Absolute ( wait_until ) and relative ( wait_for ) timeouts.
Remember that a conditional variable can wake up spotaneiously, so the wait needs to be in a loop, and should use until else it might wait for almost the time, wake up, see timer has not run out and go back to waiting for the entire time again.
Details deferred for now.
Pure functions and functional programming
Idea is that if you have a pure function, you can run it in parallel without worrying about shared state since the result only depends on the arguments, which you can pass and use futures for.
Actor model
Each thread is an actor that response to messages in a queue and updates its state from the messages. This is effectively a state machine.
void class_name::* fn_name() is a pointer to member function of class
class_name. This is used to pass member functions as callbacks. And it’s valid to drop the space between the class name and the*operator.
Experimentals
std::experimental::latch
Wait’s until the latch counter reaches zero. Any thread can decrement counter any number of times.
#include <experimental/latch>
std::latch latch(3); // Initialize latch with a count of 3
std::async(std::launch::async, [&latch]() {
// Do some work
latch.count_down(); // Decrement the latch count
}); // x3
// it's fine to send in the latch like this since we know by the next wait that latch will wait till the async completes
latch.wait(); // Wait until the latch count reaches zerostd::experimental::barrier
For a fixed number of threads. Each thread calls arrive_and_wait() to signal that it has reached the barrier. Once all threads have arrived, they can continue.
Barriers can be reset. Idea is that I use the same threads, and do a sync-up of all the threads at different points in the code.
#include <experimental/fbarrier>
std::barrier sync_point(3); // Initialize barrier for 3 threads
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.emplace_back([&sync_point]() {
// Do some work
sync_point.arrive_and_wait(); // Signal that the thread has reached the barrier
// barries resets
sync_point.arrive_and_wait(); // Can call again to reset the barrier
});
}std::experimental::flex_barrier
Variable number of threads. Takes in a function that is called when all threads have arrived at the barrier. This function can be used to change the barrier count dynamically.
#include <experimental/flex_barrier>
#include <thread>
#include <vector>
#include <iostream>
int thread_count = 3;
std::experimental::flex_barrier barrier(thread_count,
[&thread_count](std::ptrdiff_t arrived) {
std::cout << "All threads arrived: " << arrived << std::endl;
// Optionally change thread_count for next round
return thread_count; // keep the same count
}
);
std::vector<std::thread> threads;
for (int i = 0; i < thread_count; ++i) {
threads.emplace_back([&barrier, i]() {
std::cout << "Thread " << i << " reached barrier\n";
barrier.arrive_and_wait();
std::cout << "Thread " << i << " passed barrier\n";
// can also do arrive_and_drop() to remove the thread from the barrier
});
}then() chaining
A future can only be get once so if chaining cannot call get on the future. Intead call it on the final future in the chain.
#include <experimental/future>
std::experimental::future<int> fut = std::experimental::make_ready_future(10);
auto chained = fut.then([](std::experimental::future<int> f) {
return f.get() * 2; // this could return a value like here or another future ( which would be auto chained )
// so intead of future<futre<int>>, it returns future<int> by "unwrapping" the future
});
std::cout << "Result: " << chained.get() << std::endl;when_all' and when_any`
Wait for all futures or any of the futures ( so race futures ).
#include <experimental/future>
std::vector<std::experimental::future<int>> futures = {
std::experimental::make_ready_future(1),
std::experimental::make_ready_future(2),
std::experimental::make_ready_future(3)
};
auto all_fut = std::experimental::when_all(futures);
auto any_fut = std::experimental::when_any(futures);Semaphores
<semaphore> in C++20
Two types: counting_semaphore and binary_semaphore.
#include <semaphore>
// the 10 is how many more can it aquire?
auto sem = std::counting_semaphore<100>(10); // starts at 10, and goes upto 100
sem.acquire(); // Decrement the semaphore count, blocks if count is 0
sem.release(); // Increment the semaphore count, does not block
auto sem2 = std::binary_semaphore(1); // 1 or 2, basically a counting_semaphore<1>Idea is that this allows a fixed number of threads to access a resource concurrently, unlike mutexes which allow only one thread at a time.
Same thread can acquire multiple times ( say a connection that got aqcuired twice in sequence ). Same with release.
+ve value is number of threads that can acquire, -ve is the number of blocked threads.
There’s no ownership, so even if one thread just releases without acquiring that is still valid albeit a logic error.
Difference between a mutex and a binary semaphore? Mutex is owned by a thread and can only be released by that thread, while a binary semaphore can be released by any thread.