From 2ccdcb5b98b7a14c5daa115fa8c1b14ca4cb9867 Mon Sep 17 00:00:00 2001 From: Sebastian Messmer Date: Wed, 14 Oct 2015 14:40:45 +0200 Subject: [PATCH] Cache destructs elements in parallel in destructor --- implementations/caching/CachingBlockStore.cpp | 6 +++ implementations/caching/CachingBlockStore.h | 2 + implementations/caching/cache/Cache.h | 42 +++++++++++++++---- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/implementations/caching/CachingBlockStore.cpp b/implementations/caching/CachingBlockStore.cpp index bf4aef78..055796d4 100644 --- a/implementations/caching/CachingBlockStore.cpp +++ b/implementations/caching/CachingBlockStore.cpp @@ -26,7 +26,9 @@ Key CachingBlockStore::createKey() { } optional> CachingBlockStore::tryCreate(const Key &key, Data data) { + ASSERT(_cache.pop(key) == none, "Key already exists in cache"); //TODO Shouldn't we return boost::none if the key already exists? + //TODO Key can also already exist but not be in the cache right now. ++_numNewBlocks; return unique_ref(make_unique_ref(make_unique_ref(key, std::move(data), this), this)); } @@ -82,5 +84,9 @@ void CachingBlockStore::removeFromBaseStore(cpputils::unique_ref block) { _baseBlockStore->remove(std::move(block)); } +void CachingBlockStore::flush() { + _cache.flush(); +} + } } diff --git a/implementations/caching/CachingBlockStore.h b/implementations/caching/CachingBlockStore.h index 6d5660f1..c6ee255c 100644 --- a/implementations/caching/CachingBlockStore.h +++ b/implementations/caching/CachingBlockStore.h @@ -24,6 +24,8 @@ public: boost::optional> tryCreateInBaseStore(const Key &key, cpputils::Data data); void removeFromBaseStore(cpputils::unique_ref block); + void flush(); + private: cpputils::unique_ref _baseBlockStore; Cache, 1000> _cache; diff --git a/implementations/caching/cache/Cache.h b/implementations/caching/cache/Cache.h index 0ee0938d..89cef733 100644 --- a/implementations/caching/cache/Cache.h +++ b/implementations/caching/cache/Cache.h @@ -30,12 +30,16 @@ public: void push(const Key &key, Value value); boost::optional pop(const Key &key); + void flush(); + private: void _makeSpaceForEntry(std::unique_lock *lock); void _deleteEntry(std::unique_lock *lock); void _deleteOldEntriesParallel(); - void _deleteOldEntries(); - bool _deleteOldEntry(); + void _deleteAllEntriesParallel(); + void _deleteMatchingEntriesAtBeginningParallel(std::function &)> matches); + void _deleteMatchingEntriesAtBeginning(std::function &)> matches); + bool _deleteMatchingEntryAtBeginning(std::function &)> matches); mutable std::mutex _mutex; cpputils::LockPool _currentlyFlushingEntries; @@ -56,6 +60,8 @@ Cache::Cache(): _cachedBlocks(), _timeoutFlusher(nullpt template Cache::~Cache() { + _deleteAllEntriesParallel(); + ASSERT(_cachedBlocks.size() == 0, "Error in _deleteAllEntriesParallel()"); } template @@ -104,13 +110,27 @@ void Cache::_deleteEntry(std::unique_lock * lock->lock(); }; +template +void Cache::_deleteAllEntriesParallel() { + return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry &) { + return true; + }); +} + template void Cache::_deleteOldEntriesParallel() { + return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry &entry) { + return entry.ageSeconds() > PURGE_LIFETIME_SEC; + }); +} + +template +void Cache::_deleteMatchingEntriesAtBeginningParallel(std::function &)> matches) { unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency()); std::vector> waitHandles; for (unsigned int i = 0; i < numThreads; ++i) { - waitHandles.push_back(std::async(std::launch::async, [this] { - _deleteOldEntries(); + waitHandles.push_back(std::async(std::launch::async, [this, matches] { + _deleteMatchingEntriesAtBeginning(matches); })); } for (auto & waitHandle : waitHandles) { @@ -119,16 +139,16 @@ void Cache::_deleteOldEntriesParallel() { }; template -void Cache::_deleteOldEntries() { - while (_deleteOldEntry()) {} +void Cache::_deleteMatchingEntriesAtBeginning(std::function &)> matches) { + while (_deleteMatchingEntryAtBeginning(matches)) {} } template -bool Cache::_deleteOldEntry() { +bool Cache::_deleteMatchingEntryAtBeginning(std::function &)> matches) { // This function can be called in parallel by multiple threads and will then cause the Value destructors // to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running. std::unique_lock lock(_mutex); - if (_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) { + if (_cachedBlocks.size() > 0 && matches(*_cachedBlocks.peek())) { _deleteEntry(&lock); return true; } else { @@ -142,6 +162,12 @@ uint32_t Cache::size() const { return _cachedBlocks.size(); }; +template +void Cache::flush() { + //TODO Test flush() + return _deleteAllEntriesParallel(); +}; + } }