From 97e0a7e031b11a2ed6439e627541a159b6921051 Mon Sep 17 00:00:00 2001 From: Sebastian Messmer Date: Mon, 21 Jan 2019 18:59:38 -0800 Subject: [PATCH] Implement LeftRight --- src/cpp-utils/thread/LeftRight.cpp | 1 + src/cpp-utils/thread/LeftRight.h | 165 +++++++++++++++++++ test/cpp-utils/CMakeLists.txt | 1 + test/cpp-utils/thread/LeftRightTest.cpp | 204 ++++++++++++++++++++++++ 4 files changed, 371 insertions(+) create mode 100644 src/cpp-utils/thread/LeftRight.cpp create mode 100644 src/cpp-utils/thread/LeftRight.h create mode 100644 test/cpp-utils/thread/LeftRightTest.cpp diff --git a/src/cpp-utils/thread/LeftRight.cpp b/src/cpp-utils/thread/LeftRight.cpp new file mode 100644 index 00000000..d0c6cbd2 --- /dev/null +++ b/src/cpp-utils/thread/LeftRight.cpp @@ -0,0 +1 @@ +#include "LeftRight.h" diff --git a/src/cpp-utils/thread/LeftRight.h b/src/cpp-utils/thread/LeftRight.h new file mode 100644 index 00000000..1870eee9 --- /dev/null +++ b/src/cpp-utils/thread/LeftRight.h @@ -0,0 +1,165 @@ +#include +#include +#include +#include +#include +#include + +namespace cpputils { + +namespace detail { + +struct IncrementRAII final { +public: + explicit IncrementRAII(std::atomic *counter): _counter(counter) { + ++(*_counter); + } + + ~IncrementRAII() { + --(*_counter); + } +private: + std::atomic *_counter; + + DISALLOW_COPY_AND_ASSIGN(IncrementRAII); +}; + +} + +// LeftRight wait-free readers synchronization primitive +// https://hal.archives-ouvertes.fr/hal-01207881/document +template +class LeftRight final { +public: + LeftRight() + : _writeMutex() + , _foregroundCounterIndex{0} + , _foregroundDataIndex{0} + , _counters{{{0}, {0}}} + , _data{{{}, {}}} + , _inDestruction(false) {} + + ~LeftRight() { + // from now on, no new readers/writers will be accepted (see asserts in read()/write()) + _inDestruction = true; + + // wait until any potentially running writers are finished + { + std::unique_lock lock(_writeMutex); + } + + // wait until any potentially running readers are finished + while (_counters[0].load() != 0 || _counters[1].load() != 0) { + std::this_thread::yield(); + } + } + + template + auto read(F&& readFunc) const { + if(_inDestruction.load()) { + throw std::logic_error("Issued LeftRight::read() after the destructor started running"); + } + + detail::IncrementRAII _increment_counter(&_counters[_foregroundCounterIndex.load()]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index) + return readFunc(_data[_foregroundDataIndex.load()]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index) + } + + // Throwing from write would result in invalid state + template + auto write(F&& writeFunc) { + if(_inDestruction.load()) { + throw std::logic_error("Issued LeftRight::read() after the destructor started running"); + } + + std::unique_lock lock(_writeMutex); + return _write(writeFunc); + } + +private: + template + auto _write(const F& writeFunc) { + /* + * Assume, A is in background and B in foreground. In simplified terms, we want to do the following: + * 1. Write to A (old background) + * 2. Switch A/B + * 3. Write to B (new background) + * + * More detailed algorithm (explanations on why this is important are below in code): + * 1. Write to A + * 2. Switch A/B data pointers + * 3. Wait until A counter is zero + * 4. Switch A/B counters + * 5. Wait until B counter is zero + * 6. Write to B + */ + + auto localDataIndex = _foregroundDataIndex.load(); + + // 1. Write to A + _callWriteFuncOnBackgroundInstance(writeFunc, localDataIndex); + + // 2. Switch A/B data pointers + localDataIndex = localDataIndex ^ 1; + _foregroundDataIndex = localDataIndex; + + /* + * 3. Wait until A counter is zero + * + * In the previous write run, A was foreground and B was background. + * There was a time after switching _foregroundDataIndex (B to foreground) and before switching _foregroundCounterIndex, + * in which new readers could have read B but incremented A's counter. + * + * In this current run, we just switched _foregroundDataIndex (A back to foreground), but before writing to + * the new background B, we have to make sure A's counter was zero briefly, so all these old readers are gone. + */ + auto localCounterIndex = _foregroundCounterIndex.load(); + _waitForBackgroundCounterToBeZero(localCounterIndex); + + /* + *4. Switch A/B counters + * + * Now that we know all readers on B are really gone, we can switch the counters and have new readers + * increment A's counter again, which is the correct counter since they're reading A. + */ + localCounterIndex = localCounterIndex ^ 1; + _foregroundCounterIndex = localCounterIndex; + + /* + * 5. Wait until B counter is zero + * + * This waits for all the readers on B that came in while both data and counter for B was in foreground, + * i.e. normal readers that happened outside of that brief gap between switching data and counter. + */ + _waitForBackgroundCounterToBeZero(localCounterIndex); + + // 6. Write to B + _callWriteFuncOnBackgroundInstance(writeFunc, localDataIndex); + } + + template + auto _callWriteFuncOnBackgroundInstance(const F& writeFunc, uint8_t localDataIndex) { + try { + return writeFunc(_data[localDataIndex ^ 1]); // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index) + } catch (...) { + // recover invariant by copying from the foreground instance + _data[localDataIndex ^ 1] = _data[localDataIndex]; // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index) + // rethrow + throw; + } + } + + void _waitForBackgroundCounterToBeZero(uint8_t counterIndex) { + while (_counters[counterIndex ^ 1].load() != 0) { // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index) + std::this_thread::yield(); + } + } + + std::mutex _writeMutex; + std::atomic _foregroundCounterIndex; + std::atomic _foregroundDataIndex; + mutable std::array, 2> _counters; + std::array _data; + std::atomic _inDestruction; +}; + +} diff --git a/test/cpp-utils/CMakeLists.txt b/test/cpp-utils/CMakeLists.txt index d8efa2e6..be602ee9 100644 --- a/test/cpp-utils/CMakeLists.txt +++ b/test/cpp-utils/CMakeLists.txt @@ -55,6 +55,7 @@ set(SOURCES system/HomedirTest.cpp system/EnvTest.cpp thread/debugging_test.cpp + thread/LeftRightTest.cpp value_type/ValueTypeTest.cpp ) diff --git a/test/cpp-utils/thread/LeftRightTest.cpp b/test/cpp-utils/thread/LeftRightTest.cpp new file mode 100644 index 00000000..2c2db300 --- /dev/null +++ b/test/cpp-utils/thread/LeftRightTest.cpp @@ -0,0 +1,204 @@ +#include +#include +#include + +using cpputils::LeftRight; +using std::vector; + +TEST(LeftRightTest, givenInt_whenWritingAndReading_thenChangesArePresent) { + LeftRight obj; + + obj.write([] (auto& obj) {obj = 5;}); + int read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ(5, read); + + // check changes are also present in background copy + obj.write([] (auto&) {}); // this switches to the background copy + read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ(5, read); +} + +TEST(LeftRightTest, givenVector_whenWritingAndReading_thenChangesArePresent) { + LeftRight> obj; + + obj.write([] (auto& obj) {obj.push_back(5);}); + vector read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ((vector{5}), read); + + obj.write([] (auto& obj) {obj.push_back(6);}); + read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ((vector{5, 6}), read); +} + +TEST(LeftRightTest, readsCanBeConcurrent) { + LeftRight obj; + std::atomic num_running_readers{0}; + + std::thread reader1([&] () { + obj.read([&] (auto&) { + ++num_running_readers; + while(num_running_readers.load() < 2) {} + }); + }); + + std::thread reader2([&] () { + obj.read([&] (auto&) { + ++num_running_readers; + while(num_running_readers.load() < 2) {} + }); + }); + + // the threads only finish after both entered the read function. + // if LeftRight didn't allow concurrency, this would cause a deadlock. + reader1.join(); + reader2.join(); +} + +TEST(LeftRightTest, writesCanBeConcurrentWithReads_readThenWrite) { + LeftRight obj; + std::atomic reader_running{false}; + std::atomic writer_running{false}; + + std::thread reader([&] () { + obj.read([&] (auto&) { + reader_running = true; + while(!writer_running.load()) {} + }); + }); + + std::thread writer([&] () { + // run read first, write second + while (!reader_running.load()) {} + + obj.write([&] (auto&) { + writer_running = true; + }); + }); + + // the threads only finish after both entered the read function. + // if LeftRight didn't allow concurrency, this would cause a deadlock. + reader.join(); + writer.join(); +} + +TEST(LeftRightTest, writesCanBeConcurrentWithReads_writeThenRead) { + LeftRight obj; + std::atomic writer_running{false}; + std::atomic reader_running{false}; + + std::thread writer([&] () { + obj.read([&] (auto&) { + writer_running = true; + while(!reader_running.load()) {} + }); + }); + + std::thread reader([&] () { + // run write first, read second + while (!writer_running.load()) {} + + obj.read([&] (auto&) { + reader_running = true; + }); + }); + + // the threads only finish after both entered the read function. + // if LeftRight didn't allow concurrency, this would cause a deadlock. + writer.join(); + reader.join(); +} + +TEST(LeftRightTest, writesCannotBeConcurrentWithWrites) { + LeftRight obj; + std::atomic first_writer_started{false}; + std::atomic first_writer_finished{false}; + + std::thread writer1([&] () { + obj.write([&] (auto&) { + first_writer_started = true; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + first_writer_finished = true; + }); + }); + + std::thread writer2([&] () { + // make sure the other writer runs first + while (!first_writer_started.load()) {} + + obj.write([&] (auto&) { + // expect the other writer finished before this one starts + EXPECT_TRUE(first_writer_finished.load()); + }); + }); + + writer1.join(); + writer2.join(); +} + +namespace { +class MyException : std::exception {}; +} + +TEST(LeftRightTest, whenReadThrowsException_thenThrowsThrough) { + LeftRight obj; + + EXPECT_THROW( + obj.read([](auto&) {throw MyException();}), + MyException + ); +} + +TEST(LeftRightTest, whenWriteThrowsException_thenThrowsThrough) { + LeftRight obj; + + EXPECT_THROW( + obj.write([](auto&) {throw MyException();}), + MyException + ); +} + +TEST(LeftRightTest, givenInt_whenWriteThrowsException_thenResetsToOldState) { + LeftRight obj; + + obj.write([](auto& obj) {obj = 5;}); + + EXPECT_THROW( + obj.write([](auto& obj) { + obj = 6; + throw MyException(); + }), + MyException + ); + + // check reading it returns old value + int read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ(5, read); + + // check changes are also present in background copy + obj.write([] (auto&) {}); // this switches to the background copy + read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ(5, read); +} + +TEST(LeftRightTest, givenVector_whenWriteThrowsException_thenResetsToOldState) { + LeftRight> obj; + + obj.write([](auto& obj) {obj.push_back(5);}); + + EXPECT_THROW( + obj.write([](auto& obj) { + obj.push_back(6); + throw MyException(); + }), + MyException + ); + + // check reading it returns old value + vector read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ((vector{5}), read); + + // check changes are also present in background copy + obj.write([] (auto&) {}); // this switches to the background copy + read = obj.read([] (auto& obj) {return obj;}); + EXPECT_EQ((vector{5}), read); +}