Cache is flushed in parallel
This commit is contained in:
parent
52bb855627
commit
959ef62a38
56
implementations/caching/cache/Cache.h
vendored
56
implementations/caching/cache/Cache.h
vendored
@ -30,7 +30,9 @@ public:
|
|||||||
boost::optional<Value> pop(const Key &key);
|
boost::optional<Value> pop(const Key &key);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void _popOldEntriesParallel();
|
||||||
void _popOldEntries();
|
void _popOldEntries();
|
||||||
|
boost::optional<Value> _popOldEntry();
|
||||||
static void _destructElementsInParallel(std::vector<CacheEntry<Key, Value>> *list);
|
static void _destructElementsInParallel(std::vector<CacheEntry<Key, Value>> *list);
|
||||||
|
|
||||||
mutable std::mutex _mutex;
|
mutable std::mutex _mutex;
|
||||||
@ -46,8 +48,8 @@ template<class Key, class Value> constexpr double Cache<Key, Value>::MAX_LIFETIM
|
|||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
Cache<Key, Value>::Cache(): _cachedBlocks(), _timeoutFlusher(nullptr) {
|
Cache<Key, Value>::Cache(): _cachedBlocks(), _timeoutFlusher(nullptr) {
|
||||||
//Don't initialize timeoutFlusher in the initializer list,
|
//Don't initialize timeoutFlusher in the initializer list,
|
||||||
//because it then might already call Cache::popOldEntries() before Cache is done constructing
|
//because it then might already call Cache::popOldEntries() before Cache is done constructing.
|
||||||
_timeoutFlusher = std::make_unique<PeriodicTask>(std::bind(&Cache::_popOldEntries, this), PURGE_INTERVAL);
|
_timeoutFlusher = std::make_unique<PeriodicTask>(std::bind(&Cache::_popOldEntriesParallel, this), PURGE_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
@ -76,31 +78,41 @@ void Cache<Key, Value>::push(const Key &key, Value value) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
void Cache<Key, Value>::_popOldEntries() {
|
void Cache<Key, Value>::_popOldEntriesParallel() {
|
||||||
std::lock_guard<std::mutex> lock(_mutex);
|
unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency());
|
||||||
std::vector<CacheEntry<Key, Value>> entriesToDelete;
|
std::vector<std::future<void>> waitHandles;
|
||||||
while(_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) {
|
for (unsigned int i = 0; i < numThreads; ++i) {
|
||||||
entriesToDelete.push_back(*_cachedBlocks.pop());
|
waitHandles.push_back(std::async(std::launch::async, [this] {
|
||||||
|
_popOldEntries();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
for (auto & waitHandle : waitHandles) {
|
||||||
|
waitHandle.wait();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template<class Key, class Value>
|
||||||
|
void Cache<Key, Value>::_popOldEntries() {
|
||||||
|
// This function can be called in parallel by multiple threads and will then cause the Value destructors
|
||||||
|
// to be called in parallel. The call to _popOldEntry() is synchronized to avoid race conditions,
|
||||||
|
// but the Value destructor is called in this function which is not synchronized.
|
||||||
|
int num = 0;
|
||||||
|
boost::optional<Value> oldEntry = _popOldEntry();
|
||||||
|
while (oldEntry != boost::none) {
|
||||||
|
++num;
|
||||||
|
oldEntry = _popOldEntry();
|
||||||
}
|
}
|
||||||
_destructElementsInParallel(&entriesToDelete);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class Key, class Value>
|
template<class Key, class Value>
|
||||||
void Cache<Key, Value>::_destructElementsInParallel(std::vector<CacheEntry<Key, Value>> *list) {
|
boost::optional<Value> Cache<Key, Value>::_popOldEntry() {
|
||||||
//TODO Check whether this parallel destruction below works (just comment it in but keep the list->clear()) and check performance impacts. Is it better to have a lower parallelity level, i.e. #core threads?
|
std::lock_guard<std::mutex> lock(_mutex);
|
||||||
/*
|
if (_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) {
|
||||||
std::vector<std::future<void>> waitHandles;
|
return _cachedBlocks.pop()->releaseValue();
|
||||||
for (auto & entry : *list) {
|
} else {
|
||||||
waitHandles.push_back(std::async(std::launch::async, [&entry] {
|
return boost::none;
|
||||||
entry.releaseValue();
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
for (auto & waitHandle : waitHandles) {
|
};
|
||||||
waitHandle.wait();
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
list->clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,9 @@ PeriodicTask::PeriodicTask(function<void ()> task, double intervalSec) : _thread
|
|||||||
}
|
}
|
||||||
} catch (const boost::thread_interrupted &e) {
|
} catch (const boost::thread_interrupted &e) {
|
||||||
//Do nothing, exit thread.
|
//Do nothing, exit thread.
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
//TODO Think about logging
|
||||||
|
cerr << "PeriodicTask crashed: " << e.what() << endl;
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
//TODO Think about logging
|
//TODO Think about logging
|
||||||
cerr << "PeriodicTask crashed" << endl;
|
cerr << "PeriodicTask crashed" << endl;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user