Implement LeftRight
This commit is contained in:
parent
652a95dd0d
commit
97e0a7e031
1
src/cpp-utils/thread/LeftRight.cpp
Normal file
1
src/cpp-utils/thread/LeftRight.cpp
Normal file
@ -0,0 +1 @@
|
||||
#include "LeftRight.h"
|
165
src/cpp-utils/thread/LeftRight.h
Normal file
165
src/cpp-utils/thread/LeftRight.h
Normal file
@ -0,0 +1,165 @@
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <cpp-utils/macros.h>
|
||||
#include <array>
|
||||
|
||||
namespace cpputils {
|
||||
|
||||
namespace detail {
|
||||
|
||||
struct IncrementRAII final {
|
||||
public:
|
||||
explicit IncrementRAII(std::atomic<int32_t> *counter): _counter(counter) {
|
||||
++(*_counter);
|
||||
}
|
||||
|
||||
~IncrementRAII() {
|
||||
--(*_counter);
|
||||
}
|
||||
private:
|
||||
std::atomic<int32_t> *_counter;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(IncrementRAII);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// LeftRight wait-free readers synchronization primitive
|
||||
// https://hal.archives-ouvertes.fr/hal-01207881/document
|
||||
template <class T>
|
||||
class LeftRight final {
|
||||
public:
|
||||
LeftRight()
|
||||
: _writeMutex()
|
||||
, _foregroundCounterIndex{0}
|
||||
, _foregroundDataIndex{0}
|
||||
, _counters{{{0}, {0}}}
|
||||
, _data{{{}, {}}}
|
||||
, _inDestruction(false) {}
|
||||
|
||||
~LeftRight() {
|
||||
// from now on, no new readers/writers will be accepted (see asserts in read()/write())
|
||||
_inDestruction = true;
|
||||
|
||||
// wait until any potentially running writers are finished
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(_writeMutex);
|
||||
}
|
||||
|
||||
// wait until any potentially running readers are finished
|
||||
while (_counters[0].load() != 0 || _counters[1].load() != 0) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
auto read(F&& readFunc) const {
|
||||
if(_inDestruction.load()) {
|
||||
throw std::logic_error("Issued LeftRight::read() after the destructor started running");
|
||||
}
|
||||
|
||||
detail::IncrementRAII _increment_counter(&_counters[_foregroundCounterIndex.load()]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
|
||||
return readFunc(_data[_foregroundDataIndex.load()]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
|
||||
}
|
||||
|
||||
// Throwing from write would result in invalid state
|
||||
template <typename F>
|
||||
auto write(F&& writeFunc) {
|
||||
if(_inDestruction.load()) {
|
||||
throw std::logic_error("Issued LeftRight::read() after the destructor started running");
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(_writeMutex);
|
||||
return _write(writeFunc);
|
||||
}
|
||||
|
||||
private:
|
||||
template <class F>
|
||||
auto _write(const F& writeFunc) {
|
||||
/*
|
||||
* Assume, A is in background and B in foreground. In simplified terms, we want to do the following:
|
||||
* 1. Write to A (old background)
|
||||
* 2. Switch A/B
|
||||
* 3. Write to B (new background)
|
||||
*
|
||||
* More detailed algorithm (explanations on why this is important are below in code):
|
||||
* 1. Write to A
|
||||
* 2. Switch A/B data pointers
|
||||
* 3. Wait until A counter is zero
|
||||
* 4. Switch A/B counters
|
||||
* 5. Wait until B counter is zero
|
||||
* 6. Write to B
|
||||
*/
|
||||
|
||||
auto localDataIndex = _foregroundDataIndex.load();
|
||||
|
||||
// 1. Write to A
|
||||
_callWriteFuncOnBackgroundInstance(writeFunc, localDataIndex);
|
||||
|
||||
// 2. Switch A/B data pointers
|
||||
localDataIndex = localDataIndex ^ 1;
|
||||
_foregroundDataIndex = localDataIndex;
|
||||
|
||||
/*
|
||||
* 3. Wait until A counter is zero
|
||||
*
|
||||
* In the previous write run, A was foreground and B was background.
|
||||
* There was a time after switching _foregroundDataIndex (B to foreground) and before switching _foregroundCounterIndex,
|
||||
* in which new readers could have read B but incremented A's counter.
|
||||
*
|
||||
* In this current run, we just switched _foregroundDataIndex (A back to foreground), but before writing to
|
||||
* the new background B, we have to make sure A's counter was zero briefly, so all these old readers are gone.
|
||||
*/
|
||||
auto localCounterIndex = _foregroundCounterIndex.load();
|
||||
_waitForBackgroundCounterToBeZero(localCounterIndex);
|
||||
|
||||
/*
|
||||
*4. Switch A/B counters
|
||||
*
|
||||
* Now that we know all readers on B are really gone, we can switch the counters and have new readers
|
||||
* increment A's counter again, which is the correct counter since they're reading A.
|
||||
*/
|
||||
localCounterIndex = localCounterIndex ^ 1;
|
||||
_foregroundCounterIndex = localCounterIndex;
|
||||
|
||||
/*
|
||||
* 5. Wait until B counter is zero
|
||||
*
|
||||
* This waits for all the readers on B that came in while both data and counter for B was in foreground,
|
||||
* i.e. normal readers that happened outside of that brief gap between switching data and counter.
|
||||
*/
|
||||
_waitForBackgroundCounterToBeZero(localCounterIndex);
|
||||
|
||||
// 6. Write to B
|
||||
_callWriteFuncOnBackgroundInstance(writeFunc, localDataIndex);
|
||||
}
|
||||
|
||||
template<class F>
|
||||
auto _callWriteFuncOnBackgroundInstance(const F& writeFunc, uint8_t localDataIndex) {
|
||||
try {
|
||||
return writeFunc(_data[localDataIndex ^ 1]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
|
||||
} catch (...) {
|
||||
// recover invariant by copying from the foreground instance
|
||||
_data[localDataIndex ^ 1] = _data[localDataIndex]; // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
|
||||
// rethrow
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void _waitForBackgroundCounterToBeZero(uint8_t counterIndex) {
|
||||
while (_counters[counterIndex ^ 1].load() != 0) { // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index)
|
||||
std::this_thread::yield();
|
||||
}
|
||||
}
|
||||
|
||||
std::mutex _writeMutex;
|
||||
std::atomic<uint8_t> _foregroundCounterIndex;
|
||||
std::atomic<uint8_t> _foregroundDataIndex;
|
||||
mutable std::array<std::atomic<int32_t>, 2> _counters;
|
||||
std::array<T, 2> _data;
|
||||
std::atomic<bool> _inDestruction;
|
||||
};
|
||||
|
||||
}
|
@ -55,6 +55,7 @@ set(SOURCES
|
||||
system/HomedirTest.cpp
|
||||
system/EnvTest.cpp
|
||||
thread/debugging_test.cpp
|
||||
thread/LeftRightTest.cpp
|
||||
value_type/ValueTypeTest.cpp
|
||||
)
|
||||
|
||||
|
204
test/cpp-utils/thread/LeftRightTest.cpp
Normal file
204
test/cpp-utils/thread/LeftRightTest.cpp
Normal file
@ -0,0 +1,204 @@
|
||||
#include <cpp-utils/thread/LeftRight.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <vector>
|
||||
|
||||
using cpputils::LeftRight;
|
||||
using std::vector;
|
||||
|
||||
TEST(LeftRightTest, givenInt_whenWritingAndReading_thenChangesArePresent) {
|
||||
LeftRight<int> obj;
|
||||
|
||||
obj.write([] (auto& obj) {obj = 5;});
|
||||
int read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ(5, read);
|
||||
|
||||
// check changes are also present in background copy
|
||||
obj.write([] (auto&) {}); // this switches to the background copy
|
||||
read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ(5, read);
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, givenVector_whenWritingAndReading_thenChangesArePresent) {
|
||||
LeftRight<vector<int>> obj;
|
||||
|
||||
obj.write([] (auto& obj) {obj.push_back(5);});
|
||||
vector<int> read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ((vector<int>{5}), read);
|
||||
|
||||
obj.write([] (auto& obj) {obj.push_back(6);});
|
||||
read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ((vector<int>{5, 6}), read);
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, readsCanBeConcurrent) {
|
||||
LeftRight<int> obj;
|
||||
std::atomic<int> num_running_readers{0};
|
||||
|
||||
std::thread reader1([&] () {
|
||||
obj.read([&] (auto&) {
|
||||
++num_running_readers;
|
||||
while(num_running_readers.load() < 2) {}
|
||||
});
|
||||
});
|
||||
|
||||
std::thread reader2([&] () {
|
||||
obj.read([&] (auto&) {
|
||||
++num_running_readers;
|
||||
while(num_running_readers.load() < 2) {}
|
||||
});
|
||||
});
|
||||
|
||||
// the threads only finish after both entered the read function.
|
||||
// if LeftRight didn't allow concurrency, this would cause a deadlock.
|
||||
reader1.join();
|
||||
reader2.join();
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, writesCanBeConcurrentWithReads_readThenWrite) {
|
||||
LeftRight<int> obj;
|
||||
std::atomic<bool> reader_running{false};
|
||||
std::atomic<bool> writer_running{false};
|
||||
|
||||
std::thread reader([&] () {
|
||||
obj.read([&] (auto&) {
|
||||
reader_running = true;
|
||||
while(!writer_running.load()) {}
|
||||
});
|
||||
});
|
||||
|
||||
std::thread writer([&] () {
|
||||
// run read first, write second
|
||||
while (!reader_running.load()) {}
|
||||
|
||||
obj.write([&] (auto&) {
|
||||
writer_running = true;
|
||||
});
|
||||
});
|
||||
|
||||
// the threads only finish after both entered the read function.
|
||||
// if LeftRight didn't allow concurrency, this would cause a deadlock.
|
||||
reader.join();
|
||||
writer.join();
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, writesCanBeConcurrentWithReads_writeThenRead) {
|
||||
LeftRight<int> obj;
|
||||
std::atomic<bool> writer_running{false};
|
||||
std::atomic<bool> reader_running{false};
|
||||
|
||||
std::thread writer([&] () {
|
||||
obj.read([&] (auto&) {
|
||||
writer_running = true;
|
||||
while(!reader_running.load()) {}
|
||||
});
|
||||
});
|
||||
|
||||
std::thread reader([&] () {
|
||||
// run write first, read second
|
||||
while (!writer_running.load()) {}
|
||||
|
||||
obj.read([&] (auto&) {
|
||||
reader_running = true;
|
||||
});
|
||||
});
|
||||
|
||||
// the threads only finish after both entered the read function.
|
||||
// if LeftRight didn't allow concurrency, this would cause a deadlock.
|
||||
writer.join();
|
||||
reader.join();
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, writesCannotBeConcurrentWithWrites) {
|
||||
LeftRight<int> obj;
|
||||
std::atomic<bool> first_writer_started{false};
|
||||
std::atomic<bool> first_writer_finished{false};
|
||||
|
||||
std::thread writer1([&] () {
|
||||
obj.write([&] (auto&) {
|
||||
first_writer_started = true;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
first_writer_finished = true;
|
||||
});
|
||||
});
|
||||
|
||||
std::thread writer2([&] () {
|
||||
// make sure the other writer runs first
|
||||
while (!first_writer_started.load()) {}
|
||||
|
||||
obj.write([&] (auto&) {
|
||||
// expect the other writer finished before this one starts
|
||||
EXPECT_TRUE(first_writer_finished.load());
|
||||
});
|
||||
});
|
||||
|
||||
writer1.join();
|
||||
writer2.join();
|
||||
}
|
||||
|
||||
namespace {
|
||||
class MyException : std::exception {};
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, whenReadThrowsException_thenThrowsThrough) {
|
||||
LeftRight<int> obj;
|
||||
|
||||
EXPECT_THROW(
|
||||
obj.read([](auto&) {throw MyException();}),
|
||||
MyException
|
||||
);
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, whenWriteThrowsException_thenThrowsThrough) {
|
||||
LeftRight<int> obj;
|
||||
|
||||
EXPECT_THROW(
|
||||
obj.write([](auto&) {throw MyException();}),
|
||||
MyException
|
||||
);
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, givenInt_whenWriteThrowsException_thenResetsToOldState) {
|
||||
LeftRight<int> obj;
|
||||
|
||||
obj.write([](auto& obj) {obj = 5;});
|
||||
|
||||
EXPECT_THROW(
|
||||
obj.write([](auto& obj) {
|
||||
obj = 6;
|
||||
throw MyException();
|
||||
}),
|
||||
MyException
|
||||
);
|
||||
|
||||
// check reading it returns old value
|
||||
int read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ(5, read);
|
||||
|
||||
// check changes are also present in background copy
|
||||
obj.write([] (auto&) {}); // this switches to the background copy
|
||||
read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ(5, read);
|
||||
}
|
||||
|
||||
TEST(LeftRightTest, givenVector_whenWriteThrowsException_thenResetsToOldState) {
|
||||
LeftRight<vector<int>> obj;
|
||||
|
||||
obj.write([](auto& obj) {obj.push_back(5);});
|
||||
|
||||
EXPECT_THROW(
|
||||
obj.write([](auto& obj) {
|
||||
obj.push_back(6);
|
||||
throw MyException();
|
||||
}),
|
||||
MyException
|
||||
);
|
||||
|
||||
// check reading it returns old value
|
||||
vector<int> read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ((vector<int>{5}), read);
|
||||
|
||||
// check changes are also present in background copy
|
||||
obj.write([] (auto&) {}); // this switches to the background copy
|
||||
read = obj.read([] (auto& obj) {return obj;});
|
||||
EXPECT_EQ((vector<int>{5}), read);
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user