diff --git a/implementations/caching/cache/Cache.h b/implementations/caching/cache/Cache.h index ddfa2b76..007f2023 100644 --- a/implementations/caching/cache/Cache.h +++ b/implementations/caching/cache/Cache.h @@ -30,7 +30,9 @@ public: boost::optional pop(const Key &key); private: + void _popOldEntriesParallel(); void _popOldEntries(); + boost::optional _popOldEntry(); static void _destructElementsInParallel(std::vector> *list); mutable std::mutex _mutex; @@ -46,8 +48,8 @@ template constexpr double Cache::MAX_LIFETIM template Cache::Cache(): _cachedBlocks(), _timeoutFlusher(nullptr) { //Don't initialize timeoutFlusher in the initializer list, - //because it then might already call Cache::popOldEntries() before Cache is done constructing - _timeoutFlusher = std::make_unique(std::bind(&Cache::_popOldEntries, this), PURGE_INTERVAL); + //because it then might already call Cache::popOldEntries() before Cache is done constructing. + _timeoutFlusher = std::make_unique(std::bind(&Cache::_popOldEntriesParallel, this), PURGE_INTERVAL); } template @@ -76,31 +78,41 @@ void Cache::push(const Key &key, Value value) { } template -void Cache::_popOldEntries() { - std::lock_guard lock(_mutex); - std::vector> entriesToDelete; - while(_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) { - entriesToDelete.push_back(*_cachedBlocks.pop()); +void Cache::_popOldEntriesParallel() { + unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency()); + std::vector> waitHandles; + for (unsigned int i = 0; i < numThreads; ++i) { + waitHandles.push_back(std::async(std::launch::async, [this] { + _popOldEntries(); + })); + } + for (auto & waitHandle : waitHandles) { + waitHandle.wait(); + } +}; + +template +void Cache::_popOldEntries() { + // This function can be called in parallel by multiple threads and will then cause the Value destructors + // to be called in parallel. The call to _popOldEntry() is synchronized to avoid race conditions, + // but the Value destructor is called in this function which is not synchronized. + int num = 0; + boost::optional oldEntry = _popOldEntry(); + while (oldEntry != boost::none) { + ++num; + oldEntry = _popOldEntry(); } - _destructElementsInParallel(&entriesToDelete); } template -void Cache::_destructElementsInParallel(std::vector> *list) { - //TODO Check whether this parallel destruction below works (just comment it in but keep the list->clear()) and check performance impacts. Is it better to have a lower parallelity level, i.e. #core threads? - /* - std::vector> waitHandles; - for (auto & entry : *list) { - waitHandles.push_back(std::async(std::launch::async, [&entry] { - entry.releaseValue(); - })); +boost::optional Cache::_popOldEntry() { + std::lock_guard lock(_mutex); + if (_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) { + return _cachedBlocks.pop()->releaseValue(); + } else { + return boost::none; } - for (auto & waitHandle : waitHandles) { - waitHandle.wait(); - } - */ - list->clear(); -} +}; } } diff --git a/implementations/caching/cache/PeriodicTask.cpp b/implementations/caching/cache/PeriodicTask.cpp index 273bb15e..68142427 100644 --- a/implementations/caching/cache/PeriodicTask.cpp +++ b/implementations/caching/cache/PeriodicTask.cpp @@ -17,6 +17,9 @@ PeriodicTask::PeriodicTask(function task, double intervalSec) : _thread } } catch (const boost::thread_interrupted &e) { //Do nothing, exit thread. + } catch (const std::exception &e) { + //TODO Think about logging + cerr << "PeriodicTask crashed: " << e.what() << endl; } catch (...) { //TODO Think about logging cerr << "PeriodicTask crashed" << endl;