#pragma once #ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHE_H_ #define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHE_H_ #include "CacheEntry.h" #include "QueueMap.h" #include "PeriodicTask.h" #include #include #include #include #include #include namespace blockstore { namespace caching { template class Cache final { public: //TODO Current MAX_LIFETIME_SEC only considers time since the element was last pushed to the Cache. Also insert a real MAX_LIFETIME_SEC that forces resync of entries that have been pushed/popped often (e.g. the root blob) //TODO Experiment with good values static constexpr double PURGE_LIFETIME_SEC = 0.5; //When an entry has this age, it will be purged from the cache static constexpr double PURGE_INTERVAL = 0.5; // With this interval, we check for entries to purge static constexpr double MAX_LIFETIME_SEC = PURGE_LIFETIME_SEC + PURGE_INTERVAL; // This is the oldest age an entry can reach (given purging works in an ideal world, i.e. with the ideal interval and in zero time) Cache(); ~Cache(); uint32_t size() const; void push(const Key &key, Value value); boost::optional pop(const Key &key); void flush(); private: void _makeSpaceForEntry(std::unique_lock *lock); void _deleteEntry(std::unique_lock *lock); void _deleteOldEntriesParallel(); void _deleteAllEntriesParallel(); void _deleteMatchingEntriesAtBeginningParallel(std::function &)> matches); void _deleteMatchingEntriesAtBeginning(std::function &)> matches); bool _deleteMatchingEntryAtBeginning(std::function &)> matches); mutable std::mutex _mutex; cpputils::LockPool _currentlyFlushingEntries; QueueMap> _cachedBlocks; std::unique_ptr _timeoutFlusher; DISALLOW_COPY_AND_ASSIGN(Cache); }; template constexpr double Cache::PURGE_LIFETIME_SEC; template constexpr double Cache::PURGE_INTERVAL; template constexpr double Cache::MAX_LIFETIME_SEC; template Cache::Cache(): _mutex(), _currentlyFlushingEntries(), _cachedBlocks(), _timeoutFlusher(nullptr) { //Don't initialize timeoutFlusher in the initializer list, //because it then might already call Cache::popOldEntries() before Cache is done constructing. _timeoutFlusher = std::make_unique(std::bind(&Cache::_deleteOldEntriesParallel, this), PURGE_INTERVAL); } template Cache::~Cache() { _deleteAllEntriesParallel(); ASSERT(_cachedBlocks.size() == 0, "Error in _deleteAllEntriesParallel()"); } template boost::optional Cache::pop(const Key &key) { std::unique_lock lock(_mutex); cpputils::MutexPoolLock lockEntryFromBeingPopped(&_currentlyFlushingEntries, key, &lock); auto found = _cachedBlocks.pop(key); if (!found) { return boost::none; } return found->releaseValue(); } template void Cache::push(const Key &key, Value value) { std::unique_lock lock(_mutex); ASSERT(_cachedBlocks.size() <= MAX_ENTRIES, "Cache too full"); _makeSpaceForEntry(&lock); _cachedBlocks.push(key, CacheEntry(std::move(value))); } template void Cache::_makeSpaceForEntry(std::unique_lock *lock) { // _deleteEntry releases the lock while the Value destructor is running. // So we can destruct multiple entries in parallel and also call pop() or push() while doing so. // However, if another thread calls push() before we get the lock back, the cache is full again. // That's why we need the while() loop here. while (_cachedBlocks.size() == MAX_ENTRIES) { _deleteEntry(lock); } ASSERT(_cachedBlocks.size() < MAX_ENTRIES, "Removing entry from cache didn't work"); }; template void Cache::_deleteEntry(std::unique_lock *lock) { auto key = _cachedBlocks.peekKey(); ASSERT(key != boost::none, "There was no entry to delete"); cpputils::MutexPoolLock lockEntryFromBeingPopped(&_currentlyFlushingEntries, *key); auto value = _cachedBlocks.pop(); // Call destructor outside of the unique_lock, // i.e. pop() and push() can be called here, except for pop() on the element in _currentlyFlushingEntries lock->unlock(); value = boost::none; // Call destructor lock->lock(); }; template void Cache::_deleteAllEntriesParallel() { return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry &) { return true; }); } template void Cache::_deleteOldEntriesParallel() { return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry &entry) { return entry.ageSeconds() > PURGE_LIFETIME_SEC; }); } template void Cache::_deleteMatchingEntriesAtBeginningParallel(std::function &)> matches) { // Twice the number of cores, so we use full CPU even if half the threads are doing I/O unsigned int numThreads = 2 * std::max(1u, std::thread::hardware_concurrency()); std::vector> waitHandles; for (unsigned int i = 0; i < numThreads; ++i) { waitHandles.push_back(std::async(std::launch::async, [this, matches] { _deleteMatchingEntriesAtBeginning(matches); })); } for (auto & waitHandle : waitHandles) { waitHandle.wait(); } }; template void Cache::_deleteMatchingEntriesAtBeginning(std::function &)> matches) { while (_deleteMatchingEntryAtBeginning(matches)) {} } template bool Cache::_deleteMatchingEntryAtBeginning(std::function &)> matches) { // This function can be called in parallel by multiple threads and will then cause the Value destructors // to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running. std::unique_lock lock(_mutex); if (_cachedBlocks.size() > 0 && matches(*_cachedBlocks.peek())) { _deleteEntry(&lock); return true; } else { return false; } }; template uint32_t Cache::size() const { std::unique_lock lock(_mutex); return _cachedBlocks.size(); }; template void Cache::flush() { //TODO Test flush() return _deleteAllEntriesParallel(); }; } } #endif