Cache destructs elements in parallel in destructor

This commit is contained in:
Sebastian Messmer 2015-10-14 14:40:45 +02:00
parent 6fd2727592
commit 2ccdcb5b98
3 changed files with 42 additions and 8 deletions

View File

@ -26,7 +26,9 @@ Key CachingBlockStore::createKey() {
} }
optional<unique_ref<Block>> CachingBlockStore::tryCreate(const Key &key, Data data) { optional<unique_ref<Block>> CachingBlockStore::tryCreate(const Key &key, Data data) {
ASSERT(_cache.pop(key) == none, "Key already exists in cache");
//TODO Shouldn't we return boost::none if the key already exists? //TODO Shouldn't we return boost::none if the key already exists?
//TODO Key can also already exist but not be in the cache right now.
++_numNewBlocks; ++_numNewBlocks;
return unique_ref<Block>(make_unique_ref<CachedBlock>(make_unique_ref<NewBlock>(key, std::move(data), this), this)); return unique_ref<Block>(make_unique_ref<CachedBlock>(make_unique_ref<NewBlock>(key, std::move(data), this), this));
} }
@ -82,5 +84,9 @@ void CachingBlockStore::removeFromBaseStore(cpputils::unique_ref<Block> block) {
_baseBlockStore->remove(std::move(block)); _baseBlockStore->remove(std::move(block));
} }
void CachingBlockStore::flush() {
_cache.flush();
}
} }
} }

View File

@ -24,6 +24,8 @@ public:
boost::optional<cpputils::unique_ref<Block>> tryCreateInBaseStore(const Key &key, cpputils::Data data); boost::optional<cpputils::unique_ref<Block>> tryCreateInBaseStore(const Key &key, cpputils::Data data);
void removeFromBaseStore(cpputils::unique_ref<Block> block); void removeFromBaseStore(cpputils::unique_ref<Block> block);
void flush();
private: private:
cpputils::unique_ref<BlockStore> _baseBlockStore; cpputils::unique_ref<BlockStore> _baseBlockStore;
Cache<Key, cpputils::unique_ref<Block>, 1000> _cache; Cache<Key, cpputils::unique_ref<Block>, 1000> _cache;

View File

@ -30,12 +30,16 @@ public:
void push(const Key &key, Value value); void push(const Key &key, Value value);
boost::optional<Value> pop(const Key &key); boost::optional<Value> pop(const Key &key);
void flush();
private: private:
void _makeSpaceForEntry(std::unique_lock<std::mutex> *lock); void _makeSpaceForEntry(std::unique_lock<std::mutex> *lock);
void _deleteEntry(std::unique_lock<std::mutex> *lock); void _deleteEntry(std::unique_lock<std::mutex> *lock);
void _deleteOldEntriesParallel(); void _deleteOldEntriesParallel();
void _deleteOldEntries(); void _deleteAllEntriesParallel();
bool _deleteOldEntry(); void _deleteMatchingEntriesAtBeginningParallel(std::function<bool (const CacheEntry<Key, Value> &)> matches);
void _deleteMatchingEntriesAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches);
bool _deleteMatchingEntryAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches);
mutable std::mutex _mutex; mutable std::mutex _mutex;
cpputils::LockPool<Key> _currentlyFlushingEntries; cpputils::LockPool<Key> _currentlyFlushingEntries;
@ -56,6 +60,8 @@ Cache<Key, Value, MAX_ENTRIES>::Cache(): _cachedBlocks(), _timeoutFlusher(nullpt
template<class Key, class Value, uint32_t MAX_ENTRIES> template<class Key, class Value, uint32_t MAX_ENTRIES>
Cache<Key, Value, MAX_ENTRIES>::~Cache() { Cache<Key, Value, MAX_ENTRIES>::~Cache() {
_deleteAllEntriesParallel();
ASSERT(_cachedBlocks.size() == 0, "Error in _deleteAllEntriesParallel()");
} }
template<class Key, class Value, uint32_t MAX_ENTRIES> template<class Key, class Value, uint32_t MAX_ENTRIES>
@ -104,13 +110,27 @@ void Cache<Key, Value, MAX_ENTRIES>::_deleteEntry(std::unique_lock<std::mutex> *
lock->lock(); lock->lock();
}; };
template<class Key, class Value, uint32_t MAX_ENTRIES>
void Cache<Key, Value, MAX_ENTRIES>::_deleteAllEntriesParallel() {
return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry<Key, Value> &) {
return true;
});
}
template<class Key, class Value, uint32_t MAX_ENTRIES> template<class Key, class Value, uint32_t MAX_ENTRIES>
void Cache<Key, Value, MAX_ENTRIES>::_deleteOldEntriesParallel() { void Cache<Key, Value, MAX_ENTRIES>::_deleteOldEntriesParallel() {
return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry<Key, Value> &entry) {
return entry.ageSeconds() > PURGE_LIFETIME_SEC;
});
}
template<class Key, class Value, uint32_t MAX_ENTRIES>
void Cache<Key, Value, MAX_ENTRIES>::_deleteMatchingEntriesAtBeginningParallel(std::function<bool (const CacheEntry<Key, Value> &)> matches) {
unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency()); unsigned int numThreads = std::max(1u, std::thread::hardware_concurrency());
std::vector<std::future<void>> waitHandles; std::vector<std::future<void>> waitHandles;
for (unsigned int i = 0; i < numThreads; ++i) { for (unsigned int i = 0; i < numThreads; ++i) {
waitHandles.push_back(std::async(std::launch::async, [this] { waitHandles.push_back(std::async(std::launch::async, [this, matches] {
_deleteOldEntries(); _deleteMatchingEntriesAtBeginning(matches);
})); }));
} }
for (auto & waitHandle : waitHandles) { for (auto & waitHandle : waitHandles) {
@ -119,16 +139,16 @@ void Cache<Key, Value, MAX_ENTRIES>::_deleteOldEntriesParallel() {
}; };
template<class Key, class Value, uint32_t MAX_ENTRIES> template<class Key, class Value, uint32_t MAX_ENTRIES>
void Cache<Key, Value, MAX_ENTRIES>::_deleteOldEntries() { void Cache<Key, Value, MAX_ENTRIES>::_deleteMatchingEntriesAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches) {
while (_deleteOldEntry()) {} while (_deleteMatchingEntryAtBeginning(matches)) {}
} }
template<class Key, class Value, uint32_t MAX_ENTRIES> template<class Key, class Value, uint32_t MAX_ENTRIES>
bool Cache<Key, Value, MAX_ENTRIES>::_deleteOldEntry() { bool Cache<Key, Value, MAX_ENTRIES>::_deleteMatchingEntryAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches) {
// This function can be called in parallel by multiple threads and will then cause the Value destructors // This function can be called in parallel by multiple threads and will then cause the Value destructors
// to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running. // to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running.
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
if (_cachedBlocks.size() > 0 && _cachedBlocks.peek()->ageSeconds() > PURGE_LIFETIME_SEC) { if (_cachedBlocks.size() > 0 && matches(*_cachedBlocks.peek())) {
_deleteEntry(&lock); _deleteEntry(&lock);
return true; return true;
} else { } else {
@ -142,6 +162,12 @@ uint32_t Cache<Key, Value, MAX_ENTRIES>::size() const {
return _cachedBlocks.size(); return _cachedBlocks.size();
}; };
template<class Key, class Value, uint32_t MAX_ENTRIES>
void Cache<Key, Value, MAX_ENTRIES>::flush() {
//TODO Test flush()
return _deleteAllEntriesParallel();
};
} }
} }