2015-04-27 22:46:57 +02:00
# pragma once
2015-10-15 13:09:21 +02:00
# ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHE_H_
# define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHE_H_
2015-04-27 22:46:57 +02:00
# include "CacheEntry.h"
# include "QueueMap.h"
# include "PeriodicTask.h"
# include <memory>
# include <boost/optional.hpp>
2015-05-16 13:45:25 +02:00
# include <future>
2015-07-22 13:42:07 +02:00
# include <messmer/cpp-utils/assert/assert.h>
2015-10-01 15:52:43 +02:00
# include <messmer/cpp-utils/lock/MutexPoolLock.h>
2015-04-27 22:46:57 +02:00
namespace blockstore {
namespace caching {
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
2015-04-27 22:46:57 +02:00
class Cache {
public :
//TODO Experiment with good values
static constexpr double PURGE_LIFETIME_SEC = 0.5 ; //When an entry has this age, it will be purged from the cache
static constexpr double PURGE_INTERVAL = 0.5 ; // With this interval, we check for entries to purge
static constexpr double MAX_LIFETIME_SEC = PURGE_LIFETIME_SEC + PURGE_INTERVAL ; // This is the oldest age an entry can reach (given purging works in an ideal world, i.e. with the ideal interval and in zero time)
Cache ( ) ;
virtual ~ Cache ( ) ;
2015-10-05 16:51:36 +02:00
uint32_t size ( ) const ;
2015-04-27 22:46:57 +02:00
void push ( const Key & key , Value value ) ;
boost : : optional < Value > pop ( const Key & key ) ;
2015-10-14 14:40:45 +02:00
void flush ( ) ;
2015-04-27 22:46:57 +02:00
private :
2015-10-01 13:51:01 +02:00
void _makeSpaceForEntry ( std : : unique_lock < std : : mutex > * lock ) ;
void _deleteEntry ( std : : unique_lock < std : : mutex > * lock ) ;
void _deleteOldEntriesParallel ( ) ;
2015-10-14 14:40:45 +02:00
void _deleteAllEntriesParallel ( ) ;
void _deleteMatchingEntriesAtBeginningParallel ( std : : function < bool ( const CacheEntry < Key , Value > & ) > matches ) ;
void _deleteMatchingEntriesAtBeginning ( std : : function < bool ( const CacheEntry < Key , Value > & ) > matches ) ;
bool _deleteMatchingEntryAtBeginning ( std : : function < bool ( const CacheEntry < Key , Value > & ) > matches ) ;
2015-10-01 13:51:01 +02:00
mutable std : : mutex _mutex ;
cpputils : : LockPool < Key > _currentlyFlushingEntries ;
2015-04-27 22:46:57 +02:00
QueueMap < Key , CacheEntry < Key , Value > > _cachedBlocks ;
std : : unique_ptr < PeriodicTask > _timeoutFlusher ;
} ;
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES > constexpr double Cache < Key , Value , MAX_ENTRIES > : : PURGE_LIFETIME_SEC ;
template < class Key , class Value , uint32_t MAX_ENTRIES > constexpr double Cache < Key , Value , MAX_ENTRIES > : : PURGE_INTERVAL ;
template < class Key , class Value , uint32_t MAX_ENTRIES > constexpr double Cache < Key , Value , MAX_ENTRIES > : : MAX_LIFETIME_SEC ;
2015-04-27 22:46:57 +02:00
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
Cache < Key , Value , MAX_ENTRIES > : : Cache ( ) : _cachedBlocks ( ) , _timeoutFlusher ( nullptr ) {
2015-04-27 22:46:57 +02:00
//Don't initialize timeoutFlusher in the initializer list,
2015-09-29 20:01:51 +02:00
//because it then might already call Cache::popOldEntries() before Cache is done constructing.
2015-10-01 13:51:01 +02:00
_timeoutFlusher = std : : make_unique < PeriodicTask > ( std : : bind ( & Cache : : _deleteOldEntriesParallel , this ) , PURGE_INTERVAL ) ;
2015-04-27 22:46:57 +02:00
}
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
Cache < Key , Value , MAX_ENTRIES > : : ~ Cache ( ) {
2015-10-14 14:40:45 +02:00
_deleteAllEntriesParallel ( ) ;
ASSERT ( _cachedBlocks . size ( ) = = 0 , " Error in _deleteAllEntriesParallel() " ) ;
2015-04-27 22:46:57 +02:00
}
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
boost : : optional < Value > Cache < Key , Value , MAX_ENTRIES > : : pop ( const Key & key ) {
2015-10-01 13:51:01 +02:00
std : : unique_lock < std : : mutex > lock ( _mutex ) ;
2015-10-01 15:52:43 +02:00
cpputils : : MutexPoolLock < Key > lockEntryFromBeingPopped ( & _currentlyFlushingEntries , key , & lock ) ;
2015-10-01 13:51:01 +02:00
2015-04-27 22:46:57 +02:00
auto found = _cachedBlocks . pop ( key ) ;
if ( ! found ) {
return boost : : none ;
}
return found - > releaseValue ( ) ;
}
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
void Cache < Key , Value , MAX_ENTRIES > : : push ( const Key & key , Value value ) {
2015-10-01 13:51:01 +02:00
std : : unique_lock < std : : mutex > lock ( _mutex ) ;
2015-10-07 17:24:13 +02:00
//std::cout << "Pushing " << key.ToString() << "\n";
2015-07-22 13:42:07 +02:00
ASSERT ( _cachedBlocks . size ( ) < = MAX_ENTRIES , " Cache too full " ) ;
2015-10-01 13:51:01 +02:00
_makeSpaceForEntry ( & lock ) ;
2015-04-27 22:46:57 +02:00
_cachedBlocks . push ( key , CacheEntry < Key , Value > ( std : : move ( value ) ) ) ;
}
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
void Cache < Key , Value , MAX_ENTRIES > : : _makeSpaceForEntry ( std : : unique_lock < std : : mutex > * lock ) {
2015-10-01 13:51:01 +02:00
// _deleteEntry releases the lock while the Value destructor is running.
// So we can destruct multiple entries in parallel and also call pop() or push() while doing so.
// However, if another thread calls push() before we get the lock back, the cache is full again.
// That's why we need the while() loop here.
while ( _cachedBlocks . size ( ) = = MAX_ENTRIES ) {
_deleteEntry ( lock ) ;
}
ASSERT ( _cachedBlocks . size ( ) < MAX_ENTRIES , " Removing entry from cache didn't work " ) ;
} ;
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
void Cache < Key , Value , MAX_ENTRIES > : : _deleteEntry ( std : : unique_lock < std : : mutex > * lock ) {
2015-10-01 13:51:01 +02:00
auto key = _cachedBlocks . peekKey ( ) ;
ASSERT ( key ! = boost : : none , " There was no entry to delete " ) ;
2015-10-01 15:52:43 +02:00
cpputils : : MutexPoolLock < Key > lockEntryFromBeingPopped ( & _currentlyFlushingEntries , * key ) ;
2015-10-01 13:51:01 +02:00
auto value = _cachedBlocks . pop ( ) ;
// Call destructor outside of the unique_lock,
// i.e. pop() and push() can be called here, except for pop() on the element in _currentlyFlushingEntries
lock - > unlock ( ) ;
value = boost : : none ; // Call destructor
lock - > lock ( ) ;
} ;
2015-10-14 14:40:45 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
void Cache < Key , Value , MAX_ENTRIES > : : _deleteAllEntriesParallel ( ) {
return _deleteMatchingEntriesAtBeginningParallel ( [ ] ( const CacheEntry < Key , Value > & ) {
return true ;
} ) ;
}
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
void Cache < Key , Value , MAX_ENTRIES > : : _deleteOldEntriesParallel ( ) {
2015-10-14 14:40:45 +02:00
return _deleteMatchingEntriesAtBeginningParallel ( [ ] ( const CacheEntry < Key , Value > & entry ) {
return entry . ageSeconds ( ) > PURGE_LIFETIME_SEC ;
} ) ;
}
template < class Key , class Value , uint32_t MAX_ENTRIES >
void Cache < Key , Value , MAX_ENTRIES > : : _deleteMatchingEntriesAtBeginningParallel ( std : : function < bool ( const CacheEntry < Key , Value > & ) > matches ) {
2015-10-15 05:45:49 +02:00
// Twice the number of cores, so we use full CPU even if half the threads are doing I/O
unsigned int numThreads = 2 * std : : max ( 1u , std : : thread : : hardware_concurrency ( ) ) ;
2015-09-29 20:01:51 +02:00
std : : vector < std : : future < void > > waitHandles ;
for ( unsigned int i = 0 ; i < numThreads ; + + i ) {
2015-10-14 14:40:45 +02:00
waitHandles . push_back ( std : : async ( std : : launch : : async , [ this , matches ] {
_deleteMatchingEntriesAtBeginning ( matches ) ;
2015-09-29 20:01:51 +02:00
} ) ) ;
}
for ( auto & waitHandle : waitHandles ) {
waitHandle . wait ( ) ;
}
} ;
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
2015-10-14 14:40:45 +02:00
void Cache < Key , Value , MAX_ENTRIES > : : _deleteMatchingEntriesAtBeginning ( std : : function < bool ( const CacheEntry < Key , Value > & ) > matches ) {
while ( _deleteMatchingEntryAtBeginning ( matches ) ) { }
2015-05-16 13:45:25 +02:00
}
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
2015-10-14 14:40:45 +02:00
bool Cache < Key , Value , MAX_ENTRIES > : : _deleteMatchingEntryAtBeginning ( std : : function < bool ( const CacheEntry < Key , Value > & ) > matches ) {
2015-10-01 13:51:01 +02:00
// This function can be called in parallel by multiple threads and will then cause the Value destructors
// to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running.
std : : unique_lock < std : : mutex > lock ( _mutex ) ;
2015-10-14 14:40:45 +02:00
if ( _cachedBlocks . size ( ) > 0 & & matches ( * _cachedBlocks . peek ( ) ) ) {
2015-10-01 13:51:01 +02:00
_deleteEntry ( & lock ) ;
return true ;
2015-09-29 20:01:51 +02:00
} else {
2015-10-01 13:51:01 +02:00
return false ;
2015-04-27 22:46:57 +02:00
}
2015-09-29 20:01:51 +02:00
} ;
2015-04-27 22:46:57 +02:00
2015-10-07 17:24:13 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
uint32_t Cache < Key , Value , MAX_ENTRIES > : : size ( ) const {
2015-10-05 16:51:36 +02:00
std : : unique_lock < std : : mutex > lock ( _mutex ) ;
return _cachedBlocks . size ( ) ;
} ;
2015-10-14 14:40:45 +02:00
template < class Key , class Value , uint32_t MAX_ENTRIES >
void Cache < Key , Value , MAX_ENTRIES > : : flush ( ) {
//TODO Test flush()
return _deleteAllEntriesParallel ( ) ;
} ;
2015-04-27 22:46:57 +02:00
}
}
# endif