diff --git a/implementations/caching/cache/Cache.h b/implementations/caching/cache/Cache.h index 8660b663..48655660 100644 --- a/implementations/caching/cache/Cache.h +++ b/implementations/caching/cache/Cache.h @@ -6,11 +6,10 @@ #include "QueueMap.h" #include "PeriodicTask.h" #include -//TODO Replace with C++14 once std::shared_mutex is supported -#include #include #include #include +#include namespace blockstore { namespace caching { @@ -31,12 +30,14 @@ public: boost::optional pop(const Key &key); private: - void _popOldEntriesParallel(); - void _popOldEntries(); - boost::optional _popOldEntry(boost::upgrade_lock *lock); - static void _destructElementsInParallel(std::vector> *list); + void _makeSpaceForEntry(std::unique_lock *lock); + void _deleteEntry(std::unique_lock *lock); + void _deleteOldEntriesParallel(); + void _deleteOldEntries(); + bool _deleteOldEntry(); - mutable boost::shared_mutex _mutex; + mutable std::mutex _mutex; + cpputils::LockPool _currentlyFlushingEntries; QueueMap> _cachedBlocks; std::unique_ptr _timeoutFlusher; }; @@ -50,7 +51,7 @@ template Cache::Cache(): _cachedBlocks(), _timeoutFlusher(nullptr) { //Don't initialize timeoutFlusher in the initializer list, //because it then might already call Cache::popOldEntries() before Cache is done constructing. - _timeoutFlusher = std::make_unique(std::bind(&Cache::_popOldEntriesParallel, this), PURGE_INTERVAL); + _timeoutFlusher = std::make_unique(std::bind(&Cache::_deleteOldEntriesParallel, this), PURGE_INTERVAL); } template @@ -59,32 +60,59 @@ Cache::~Cache() { template boost::optional Cache::pop(const Key &key) { - boost::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); + _currentlyFlushingEntries.lock(key, &lock); + auto found = _cachedBlocks.pop(key); if (!found) { return boost::none; } + + _currentlyFlushingEntries.release(key); return found->releaseValue(); } template void Cache::push(const Key &key, Value value) { - boost::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); ASSERT(_cachedBlocks.size() <= MAX_ENTRIES, "Cache too full"); - if (_cachedBlocks.size() == MAX_ENTRIES) { - _cachedBlocks.pop(); - ASSERT(_cachedBlocks.size() == MAX_ENTRIES-1, "Removing entry from cache didn't work"); - } + _makeSpaceForEntry(&lock); _cachedBlocks.push(key, CacheEntry(std::move(value))); } template -void Cache::_popOldEntriesParallel() { +void Cache::_makeSpaceForEntry(std::unique_lock *lock) { + // _deleteEntry releases the lock while the Value destructor is running. + // So we can destruct multiple entries in parallel and also call pop() or push() while doing so. + // However, if another thread calls push() before we get the lock back, the cache is full again. + // That's why we need the while() loop here. + while (_cachedBlocks.size() == MAX_ENTRIES) { + _deleteEntry(lock); + } + ASSERT(_cachedBlocks.size() < MAX_ENTRIES, "Removing entry from cache didn't work"); +}; + +template +void Cache::_deleteEntry(std::unique_lock *lock) { + auto key = _cachedBlocks.peekKey(); + ASSERT(key != boost::none, "There was no entry to delete"); + _currentlyFlushingEntries.lock(*key); + auto value = _cachedBlocks.pop(); + // Call destructor outside of the unique_lock, + // i.e. pop() and push() can be called here, except for pop() on the element in _currentlyFlushingEntries + lock->unlock(); + value = boost::none; // Call destructor + lock->lock(); + _currentlyFlushingEntries.release(*key); +}; + +template +void Cache::_deleteOldEntriesParallel() { unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency()); std::vector> waitHandles; for (unsigned int i = 0; i < numThreads; ++i) { waitHandles.push_back(std::async(std::launch::async, [this] { - _popOldEntries(); + _deleteOldEntries(); })); } for (auto & waitHandle : waitHandles) { @@ -93,40 +121,20 @@ void Cache::_popOldEntriesParallel() { }; template -void Cache::_popOldEntries() { - // This function can be called in parallel by multiple threads and will then cause the Value destructors - // to be called in parallel. The call to _popOldEntry() is synchronized to avoid race conditions, - // but the Value destructor is called in this function which is not synchronized. - - // The shared upgrade_lock in here takes care that no push() or pop() operation is running while an old entry is deleted. - // This would cause race conditions because pop() could return none before the destructor of the deleted element - // has finished running. Since the destructor of a cached newly created block creates it in the base block store, - // there would be a state where the block can neither be found by a pop() in the cache, nor in the base store. - // so CachingBlockStore would return that the block doesn't exist. - // The shared lock is then upgraded to a unique lock in _popOldEntry, so that only one thread can work on the - // _cachedBlocks vector at the same time. - // There is a regression test case for this: CacheTest_RaceCondition:PopBlocksWhileRequestedElementIsThrownOut. - while (true) { - //TODO Since there are 4 threads running this, there will always be one having one of the shared locks. - // That is, while purging is running, no thread has a chance of calling pop() or push() and purging has priority. - // Fix this, use something like priority locks for pop() and push()? - // http://stackoverflow.com/questions/11666610/how-to-give-priority-to-privileged-thread-in-mutex-locking - boost::upgrade_lock lock(_mutex); - boost::optional oldEntry = _popOldEntry(&lock); - if (oldEntry == boost::none) { - break; - } - oldEntry = boost::none; // Call destructor (inside shared lock) - } +void Cache::_deleteOldEntries() { + while (_deleteOldEntry()) {} } template -boost::optional Cache::_popOldEntry(boost::upgrade_lock *lock) { - boost::upgrade_to_unique_lock exclusiveLock(*lock); +bool Cache::_deleteOldEntry() { + // This function can be called in parallel by multiple threads and will then cause the Value destructors + // to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running. + std::unique_lock lock(_mutex); if (_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) { - return _cachedBlocks.pop()->releaseValue(); + _deleteEntry(&lock); + return true; } else { - return boost::none; + return false; } }; diff --git a/implementations/caching/cache/QueueMap.h b/implementations/caching/cache/QueueMap.h index f412432b..e13809b8 100644 --- a/implementations/caching/cache/QueueMap.h +++ b/implementations/caching/cache/QueueMap.h @@ -52,6 +52,13 @@ public: return pop(*_sentinel.next->key); } + boost::optional peekKey() { + if(_sentinel.next == &_sentinel) { + return boost::none; + } + return *_sentinel.next->key; + } + boost::optional peek() { if(_sentinel.next == &_sentinel) { return boost::none; diff --git a/test/implementations/caching/cache/CacheTest_RaceCondition.cpp b/test/implementations/caching/cache/CacheTest_RaceCondition.cpp index 9ce282ae..7a7741ad 100644 --- a/test/implementations/caching/cache/CacheTest_RaceCondition.cpp +++ b/test/implementations/caching/cache/CacheTest_RaceCondition.cpp @@ -1,15 +1,22 @@ #include "testutils/CacheTest.h" #include #include -#include +#include +#include #include using namespace std::chrono_literals; using namespace blockstore::caching; using std::string; -using cpputils::unique_ref; -using cpputils::make_unique_ref; using cpputils::ConditionBarrier; +using std::unique_ptr; +using std::make_unique; +using std::future; + +// Regression tests for a race condition. +// An element could be in the process of being thrown out of the cache and while the destructor is running, another +// thread calls pop() for the element and gets none returned. But since the destructor isn't finished yet, the data from +// the cache element also isn't completely written back yet and an application loading it runs into a race condition. class ObjectWithLongDestructor { public: @@ -25,20 +32,74 @@ private: bool *_destructorFinished; }; -// Regression test for a race condition. -// An element could be in the process of being thrown out of the cache and while the destructor is running, another -// thread calls pop() for the element and gets none returned. But since the destructor isn't finished yet, the data from -// the cache element also isn't completely written back yet and an application loading it runs into a race condition. -TEST(CacheTest_RaceCondition, PopBlocksWhileRequestedElementIsThrownOut) { +class CacheTest_RaceCondition: public ::testing::Test { +public: + CacheTest_RaceCondition(): cache(), destructorStarted(), destructorFinished(false) {} + + Cache> cache; ConditionBarrier destructorStarted; bool destructorFinished; - auto obj = make_unique_ref(&destructorStarted, &destructorFinished); - Cache> cache; - cache.push(2, std::move(obj)); + int pushObjectWithLongDestructor() { + cache.push(2, make_unique(&destructorStarted, &destructorFinished)); + return 2; + } + + int pushDummyObject() { + cache.push(3, nullptr); + return 3; + } + + future causeCacheOverflowInOtherThread() { + //Add maximum+1 element in another thread (this causes the cache to flush the first element in another thread) + return std::async(std::launch::async, [this] { + for(unsigned int i = 0; i < cache.MAX_ENTRIES+1; ++i) { + cache.push(cache.MAX_ENTRIES+i, nullptr); + } + }); + } + + void EXPECT_POP_BLOCKS_UNTIL_DESTRUCTOR_FINISHED(int key) { + EXPECT_FALSE(destructorFinished); + cache.pop(key); + EXPECT_TRUE(destructorFinished); + } + + void EXPECT_POP_DOESNT_BLOCK_UNTIL_DESTRUCTOR_FINISHED(int key) { + EXPECT_FALSE(destructorFinished); + cache.pop(key); + EXPECT_FALSE(destructorFinished); + } +}; + +TEST_F(CacheTest_RaceCondition, PopBlocksWhileRequestedElementIsThrownOut_ByAge) { + auto id = pushObjectWithLongDestructor(); destructorStarted.wait(); - EXPECT_FALSE(destructorFinished); - cache.pop(2); - EXPECT_TRUE(destructorFinished); -} \ No newline at end of file + EXPECT_POP_BLOCKS_UNTIL_DESTRUCTOR_FINISHED(id); +} + +TEST_F(CacheTest_RaceCondition, PopDoesntBlockWhileOtherElementIsThrownOut_ByAge) { + pushObjectWithLongDestructor(); + auto id = pushDummyObject(); + + destructorStarted.wait(); + EXPECT_POP_DOESNT_BLOCK_UNTIL_DESTRUCTOR_FINISHED(id); +} + +TEST_F(CacheTest_RaceCondition, PopBlocksWhileRequestedElementIsThrownOut_ByPush) { + auto id = pushObjectWithLongDestructor(); + + auto future = causeCacheOverflowInOtherThread(); + destructorStarted.wait(); + EXPECT_POP_BLOCKS_UNTIL_DESTRUCTOR_FINISHED(id); +} + +TEST_F(CacheTest_RaceCondition, PopDoesntBlockWhileOtherElementIsThrownOut_ByPush) { + pushObjectWithLongDestructor(); + auto id = pushDummyObject(); + + auto future = causeCacheOverflowInOtherThread(); + destructorStarted.wait(); + EXPECT_POP_DOESNT_BLOCK_UNTIL_DESTRUCTOR_FINISHED(id); +}