Cache has better parallelity - we can push()/pop() while it is flushing and flushing is actually parallel
This commit is contained in:
parent
c8c13517e0
commit
84330b1100
98
implementations/caching/cache/Cache.h
vendored
98
implementations/caching/cache/Cache.h
vendored
@ -6,11 +6,10 @@
|
|||||||
#include "QueueMap.h"
|
#include "QueueMap.h"
|
||||||
#include "PeriodicTask.h"
|
#include "PeriodicTask.h"
|
||||||
#include <memory>
|
#include <memory>
|
||||||
//TODO Replace with C++14 once std::shared_mutex is supported
|
|
||||||
#include <boost/thread/shared_mutex.hpp>
|
|
||||||
#include <boost/optional.hpp>
|
#include <boost/optional.hpp>
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <messmer/cpp-utils/assert/assert.h>
|
#include <messmer/cpp-utils/assert/assert.h>
|
||||||
|
#include <messmer/cpp-utils/lock/LockPool.h>
|
||||||
|
|
||||||
namespace blockstore {
|
namespace blockstore {
|
||||||
namespace caching {
|
namespace caching {
|
||||||
@ -31,12 +30,14 @@ public:
|
|||||||
boost::optional<Value> pop(const Key &key);
|
boost::optional<Value> pop(const Key &key);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void _popOldEntriesParallel();
|
void _makeSpaceForEntry(std::unique_lock<std::mutex> *lock);
|
||||||
void _popOldEntries();
|
void _deleteEntry(std::unique_lock<std::mutex> *lock);
|
||||||
boost::optional<Value> _popOldEntry(boost::upgrade_lock<boost::shared_mutex> *lock);
|
void _deleteOldEntriesParallel();
|
||||||
static void _destructElementsInParallel(std::vector<CacheEntry<Key, Value>> *list);
|
void _deleteOldEntries();
|
||||||
|
bool _deleteOldEntry();
|
||||||
|
|
||||||
mutable boost::shared_mutex _mutex;
|
mutable std::mutex _mutex;
|
||||||
|
cpputils::LockPool<Key> _currentlyFlushingEntries;
|
||||||
QueueMap<Key, CacheEntry<Key, Value>> _cachedBlocks;
|
QueueMap<Key, CacheEntry<Key, Value>> _cachedBlocks;
|
||||||
std::unique_ptr<PeriodicTask> _timeoutFlusher;
|
std::unique_ptr<PeriodicTask> _timeoutFlusher;
|
||||||
};
|
};
|
||||||
@ -50,7 +51,7 @@ template<class Key, class Value>
|
|||||||
Cache<Key, Value>::Cache(): _cachedBlocks(), _timeoutFlusher(nullptr) {
|
Cache<Key, Value>::Cache(): _cachedBlocks(), _timeoutFlusher(nullptr) {
|
||||||
//Don't initialize timeoutFlusher in the initializer list,
|
//Don't initialize timeoutFlusher in the initializer list,
|
||||||
//because it then might already call Cache::popOldEntries() before Cache is done constructing.
|
//because it then might already call Cache::popOldEntries() before Cache is done constructing.
|
||||||
_timeoutFlusher = std::make_unique<PeriodicTask>(std::bind(&Cache::_popOldEntriesParallel, this), PURGE_INTERVAL);
|
_timeoutFlusher = std::make_unique<PeriodicTask>(std::bind(&Cache::_deleteOldEntriesParallel, this), PURGE_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
@ -59,32 +60,59 @@ Cache<Key, Value>::~Cache() {
|
|||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
boost::optional<Value> Cache<Key, Value>::pop(const Key &key) {
|
boost::optional<Value> Cache<Key, Value>::pop(const Key &key) {
|
||||||
boost::unique_lock<boost::shared_mutex> lock(_mutex);
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
_currentlyFlushingEntries.lock(key, &lock);
|
||||||
|
|
||||||
auto found = _cachedBlocks.pop(key);
|
auto found = _cachedBlocks.pop(key);
|
||||||
if (!found) {
|
if (!found) {
|
||||||
return boost::none;
|
return boost::none;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_currentlyFlushingEntries.release(key);
|
||||||
return found->releaseValue();
|
return found->releaseValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
void Cache<Key, Value>::push(const Key &key, Value value) {
|
void Cache<Key, Value>::push(const Key &key, Value value) {
|
||||||
boost::unique_lock<boost::shared_mutex> lock(_mutex);
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
ASSERT(_cachedBlocks.size() <= MAX_ENTRIES, "Cache too full");
|
ASSERT(_cachedBlocks.size() <= MAX_ENTRIES, "Cache too full");
|
||||||
if (_cachedBlocks.size() == MAX_ENTRIES) {
|
_makeSpaceForEntry(&lock);
|
||||||
_cachedBlocks.pop();
|
|
||||||
ASSERT(_cachedBlocks.size() == MAX_ENTRIES-1, "Removing entry from cache didn't work");
|
|
||||||
}
|
|
||||||
_cachedBlocks.push(key, CacheEntry<Key, Value>(std::move(value)));
|
_cachedBlocks.push(key, CacheEntry<Key, Value>(std::move(value)));
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
void Cache<Key, Value>::_popOldEntriesParallel() {
|
void Cache<Key, Value>::_makeSpaceForEntry(std::unique_lock<std::mutex> *lock) {
|
||||||
|
// _deleteEntry releases the lock while the Value destructor is running.
|
||||||
|
// So we can destruct multiple entries in parallel and also call pop() or push() while doing so.
|
||||||
|
// However, if another thread calls push() before we get the lock back, the cache is full again.
|
||||||
|
// That's why we need the while() loop here.
|
||||||
|
while (_cachedBlocks.size() == MAX_ENTRIES) {
|
||||||
|
_deleteEntry(lock);
|
||||||
|
}
|
||||||
|
ASSERT(_cachedBlocks.size() < MAX_ENTRIES, "Removing entry from cache didn't work");
|
||||||
|
};
|
||||||
|
|
||||||
|
template<class Key, class Value>
|
||||||
|
void Cache<Key, Value>::_deleteEntry(std::unique_lock<std::mutex> *lock) {
|
||||||
|
auto key = _cachedBlocks.peekKey();
|
||||||
|
ASSERT(key != boost::none, "There was no entry to delete");
|
||||||
|
_currentlyFlushingEntries.lock(*key);
|
||||||
|
auto value = _cachedBlocks.pop();
|
||||||
|
// Call destructor outside of the unique_lock,
|
||||||
|
// i.e. pop() and push() can be called here, except for pop() on the element in _currentlyFlushingEntries
|
||||||
|
lock->unlock();
|
||||||
|
value = boost::none; // Call destructor
|
||||||
|
lock->lock();
|
||||||
|
_currentlyFlushingEntries.release(*key);
|
||||||
|
};
|
||||||
|
|
||||||
|
template<class Key, class Value>
|
||||||
|
void Cache<Key, Value>::_deleteOldEntriesParallel() {
|
||||||
unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency());
|
unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency());
|
||||||
std::vector<std::future<void>> waitHandles;
|
std::vector<std::future<void>> waitHandles;
|
||||||
for (unsigned int i = 0; i < numThreads; ++i) {
|
for (unsigned int i = 0; i < numThreads; ++i) {
|
||||||
waitHandles.push_back(std::async(std::launch::async, [this] {
|
waitHandles.push_back(std::async(std::launch::async, [this] {
|
||||||
_popOldEntries();
|
_deleteOldEntries();
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
for (auto & waitHandle : waitHandles) {
|
for (auto & waitHandle : waitHandles) {
|
||||||
@ -93,40 +121,20 @@ void Cache<Key, Value>::_popOldEntriesParallel() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
void Cache<Key, Value>::_popOldEntries() {
|
void Cache<Key, Value>::_deleteOldEntries() {
|
||||||
// This function can be called in parallel by multiple threads and will then cause the Value destructors
|
while (_deleteOldEntry()) {}
|
||||||
// to be called in parallel. The call to _popOldEntry() is synchronized to avoid race conditions,
|
|
||||||
// but the Value destructor is called in this function which is not synchronized.
|
|
||||||
|
|
||||||
// The shared upgrade_lock in here takes care that no push() or pop() operation is running while an old entry is deleted.
|
|
||||||
// This would cause race conditions because pop() could return none before the destructor of the deleted element
|
|
||||||
// has finished running. Since the destructor of a cached newly created block creates it in the base block store,
|
|
||||||
// there would be a state where the block can neither be found by a pop() in the cache, nor in the base store.
|
|
||||||
// so CachingBlockStore would return that the block doesn't exist.
|
|
||||||
// The shared lock is then upgraded to a unique lock in _popOldEntry, so that only one thread can work on the
|
|
||||||
// _cachedBlocks vector at the same time.
|
|
||||||
// There is a regression test case for this: CacheTest_RaceCondition:PopBlocksWhileRequestedElementIsThrownOut.
|
|
||||||
while (true) {
|
|
||||||
//TODO Since there are 4 threads running this, there will always be one having one of the shared locks.
|
|
||||||
// That is, while purging is running, no thread has a chance of calling pop() or push() and purging has priority.
|
|
||||||
// Fix this, use something like priority locks for pop() and push()?
|
|
||||||
// http://stackoverflow.com/questions/11666610/how-to-give-priority-to-privileged-thread-in-mutex-locking
|
|
||||||
boost::upgrade_lock<boost::shared_mutex> lock(_mutex);
|
|
||||||
boost::optional<Value> oldEntry = _popOldEntry(&lock);
|
|
||||||
if (oldEntry == boost::none) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
oldEntry = boost::none; // Call destructor (inside shared lock)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
boost::optional<Value> Cache<Key, Value>::_popOldEntry(boost::upgrade_lock<boost::shared_mutex> *lock) {
|
bool Cache<Key, Value>::_deleteOldEntry() {
|
||||||
boost::upgrade_to_unique_lock<boost::shared_mutex> exclusiveLock(*lock);
|
// This function can be called in parallel by multiple threads and will then cause the Value destructors
|
||||||
|
// to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running.
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
if (_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) {
|
if (_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) {
|
||||||
return _cachedBlocks.pop()->releaseValue();
|
_deleteEntry(&lock);
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return boost::none;
|
return false;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
7
implementations/caching/cache/QueueMap.h
vendored
7
implementations/caching/cache/QueueMap.h
vendored
@ -52,6 +52,13 @@ public:
|
|||||||
return pop(*_sentinel.next->key);
|
return pop(*_sentinel.next->key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boost::optional<const Key &> peekKey() {
|
||||||
|
if(_sentinel.next == &_sentinel) {
|
||||||
|
return boost::none;
|
||||||
|
}
|
||||||
|
return *_sentinel.next->key;
|
||||||
|
}
|
||||||
|
|
||||||
boost::optional<const Value &> peek() {
|
boost::optional<const Value &> peek() {
|
||||||
if(_sentinel.next == &_sentinel) {
|
if(_sentinel.next == &_sentinel) {
|
||||||
return boost::none;
|
return boost::none;
|
||||||
|
@ -1,15 +1,22 @@
|
|||||||
#include "testutils/CacheTest.h"
|
#include "testutils/CacheTest.h"
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <messmer/cpp-utils/pointer/unique_ref.h>
|
#include <memory>
|
||||||
|
#include <future>
|
||||||
#include <messmer/cpp-utils/lock/ConditionBarrier.h>
|
#include <messmer/cpp-utils/lock/ConditionBarrier.h>
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
using namespace blockstore::caching;
|
using namespace blockstore::caching;
|
||||||
using std::string;
|
using std::string;
|
||||||
using cpputils::unique_ref;
|
|
||||||
using cpputils::make_unique_ref;
|
|
||||||
using cpputils::ConditionBarrier;
|
using cpputils::ConditionBarrier;
|
||||||
|
using std::unique_ptr;
|
||||||
|
using std::make_unique;
|
||||||
|
using std::future;
|
||||||
|
|
||||||
|
// Regression tests for a race condition.
|
||||||
|
// An element could be in the process of being thrown out of the cache and while the destructor is running, another
|
||||||
|
// thread calls pop() for the element and gets none returned. But since the destructor isn't finished yet, the data from
|
||||||
|
// the cache element also isn't completely written back yet and an application loading it runs into a race condition.
|
||||||
|
|
||||||
class ObjectWithLongDestructor {
|
class ObjectWithLongDestructor {
|
||||||
public:
|
public:
|
||||||
@ -25,20 +32,74 @@ private:
|
|||||||
bool *_destructorFinished;
|
bool *_destructorFinished;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Regression test for a race condition.
|
class CacheTest_RaceCondition: public ::testing::Test {
|
||||||
// An element could be in the process of being thrown out of the cache and while the destructor is running, another
|
public:
|
||||||
// thread calls pop() for the element and gets none returned. But since the destructor isn't finished yet, the data from
|
CacheTest_RaceCondition(): cache(), destructorStarted(), destructorFinished(false) {}
|
||||||
// the cache element also isn't completely written back yet and an application loading it runs into a race condition.
|
|
||||||
TEST(CacheTest_RaceCondition, PopBlocksWhileRequestedElementIsThrownOut) {
|
Cache<int, unique_ptr<ObjectWithLongDestructor>> cache;
|
||||||
ConditionBarrier destructorStarted;
|
ConditionBarrier destructorStarted;
|
||||||
bool destructorFinished;
|
bool destructorFinished;
|
||||||
|
|
||||||
auto obj = make_unique_ref<ObjectWithLongDestructor>(&destructorStarted, &destructorFinished);
|
int pushObjectWithLongDestructor() {
|
||||||
Cache<int, unique_ref<ObjectWithLongDestructor>> cache;
|
cache.push(2, make_unique<ObjectWithLongDestructor>(&destructorStarted, &destructorFinished));
|
||||||
cache.push(2, std::move(obj));
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
int pushDummyObject() {
|
||||||
|
cache.push(3, nullptr);
|
||||||
|
return 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
future<void> causeCacheOverflowInOtherThread() {
|
||||||
|
//Add maximum+1 element in another thread (this causes the cache to flush the first element in another thread)
|
||||||
|
return std::async(std::launch::async, [this] {
|
||||||
|
for(unsigned int i = 0; i < cache.MAX_ENTRIES+1; ++i) {
|
||||||
|
cache.push(cache.MAX_ENTRIES+i, nullptr);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void EXPECT_POP_BLOCKS_UNTIL_DESTRUCTOR_FINISHED(int key) {
|
||||||
|
EXPECT_FALSE(destructorFinished);
|
||||||
|
cache.pop(key);
|
||||||
|
EXPECT_TRUE(destructorFinished);
|
||||||
|
}
|
||||||
|
|
||||||
|
void EXPECT_POP_DOESNT_BLOCK_UNTIL_DESTRUCTOR_FINISHED(int key) {
|
||||||
|
EXPECT_FALSE(destructorFinished);
|
||||||
|
cache.pop(key);
|
||||||
|
EXPECT_FALSE(destructorFinished);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(CacheTest_RaceCondition, PopBlocksWhileRequestedElementIsThrownOut_ByAge) {
|
||||||
|
auto id = pushObjectWithLongDestructor();
|
||||||
|
|
||||||
destructorStarted.wait();
|
destructorStarted.wait();
|
||||||
EXPECT_FALSE(destructorFinished);
|
EXPECT_POP_BLOCKS_UNTIL_DESTRUCTOR_FINISHED(id);
|
||||||
cache.pop(2);
|
}
|
||||||
EXPECT_TRUE(destructorFinished);
|
|
||||||
|
TEST_F(CacheTest_RaceCondition, PopDoesntBlockWhileOtherElementIsThrownOut_ByAge) {
|
||||||
|
pushObjectWithLongDestructor();
|
||||||
|
auto id = pushDummyObject();
|
||||||
|
|
||||||
|
destructorStarted.wait();
|
||||||
|
EXPECT_POP_DOESNT_BLOCK_UNTIL_DESTRUCTOR_FINISHED(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(CacheTest_RaceCondition, PopBlocksWhileRequestedElementIsThrownOut_ByPush) {
|
||||||
|
auto id = pushObjectWithLongDestructor();
|
||||||
|
|
||||||
|
auto future = causeCacheOverflowInOtherThread();
|
||||||
|
destructorStarted.wait();
|
||||||
|
EXPECT_POP_BLOCKS_UNTIL_DESTRUCTOR_FINISHED(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(CacheTest_RaceCondition, PopDoesntBlockWhileOtherElementIsThrownOut_ByPush) {
|
||||||
|
pushObjectWithLongDestructor();
|
||||||
|
auto id = pushDummyObject();
|
||||||
|
|
||||||
|
auto future = causeCacheOverflowInOtherThread();
|
||||||
|
destructorStarted.wait();
|
||||||
|
EXPECT_POP_DOESNT_BLOCK_UNTIL_DESTRUCTOR_FINISHED(id);
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user