diff --git a/implementations/caching/Cache.cpp b/implementations/caching/Cache.cpp index c37adb6f..63df6758 100644 --- a/implementations/caching/Cache.cpp +++ b/implementations/caching/Cache.cpp @@ -1,4 +1,5 @@ #include "Cache.h" +#include "PeriodicTask.h" using std::unique_ptr; using std::make_unique; @@ -10,8 +11,14 @@ namespace blockstore { namespace caching { constexpr uint32_t Cache::MAX_ENTRIES; +constexpr double Cache::PURGE_LIFETIME_SEC; +constexpr double Cache::PURGE_INTERVAL; +constexpr double Cache::MAX_LIFETIME_SEC; -Cache::Cache(): _cachedBlocks() { +Cache::Cache(): _cachedBlocks(), _timeoutFlusher(nullptr) { + //Don't initialize timeoutFlusher in the initializer list, + //because it then might already call Cache::popOldEntries() before Cache is done constructing + _timeoutFlusher = make_unique(std::bind(&Cache::_popOldEntries, this), PURGE_INTERVAL); } Cache::~Cache() { @@ -38,5 +45,14 @@ void Cache::push(unique_ptr block) { _cachedBlocks.push(key, make_unique(std::move(block))); } +void Cache::_popOldEntries() { + lock_guard lock(_mutex); + while(_cachedBlocks.size() > 0 && _cachedBlocks.peek().ageSeconds() > PURGE_LIFETIME_SEC) { + double age = _cachedBlocks.peek().ageSeconds(); + printf("Removing block with age: %f\n", age); + _cachedBlocks.pop(); + } +} + } } diff --git a/implementations/caching/Cache.h b/implementations/caching/Cache.h index 2e293ddc..7839b086 100644 --- a/implementations/caching/Cache.h +++ b/implementations/caching/Cache.h @@ -10,6 +10,7 @@ namespace blockstore { namespace caching { +class PeriodicTask; //TODO Test //TODO Also throw blocks out after a timeout @@ -17,6 +18,10 @@ namespace caching { class Cache { public: static constexpr uint32_t MAX_ENTRIES = 1000; + //TODO Experiment with good values + static constexpr double PURGE_LIFETIME_SEC = 0.5; //When an entry has this age, it will be purged from the cache + static constexpr double PURGE_INTERVAL = 0.5; // With this interval, we check for entries to purge + static constexpr double MAX_LIFETIME_SEC = PURGE_LIFETIME_SEC + PURGE_INTERVAL; // This is the oldest age an entry can reach (given purging works in an ideal world, i.e. with the ideal interval and in zero time) Cache(); virtual ~Cache(); @@ -25,8 +30,12 @@ public: std::unique_ptr pop(const Key &key); private: + void _popOldEntries(); + + mutable std::mutex _mutex; QueueMap _cachedBlocks; + std::unique_ptr _timeoutFlusher; }; } diff --git a/implementations/caching/CacheEntry.h b/implementations/caching/CacheEntry.h index ce19d85e..7c11f0c0 100644 --- a/implementations/caching/CacheEntry.h +++ b/implementations/caching/CacheEntry.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace blockstore { class Block; @@ -12,13 +13,13 @@ namespace caching { class CacheEntry { public: - CacheEntry(std::unique_ptr block): _lastAccess(time(nullptr)), _block(std::move(block)) { + CacheEntry(std::unique_ptr block): _lastAccess(currentTime()), _block(std::move(block)) { } CacheEntry(CacheEntry &&) = default; double ageSeconds() const { - return difftime(time(nullptr), _lastAccess); + return ((double)(currentTime() - _lastAccess).total_nanoseconds()) / ((double)1000000000); } std::unique_ptr releaseBlock() { @@ -34,10 +35,14 @@ public: } private: - time_t _lastAccess; + boost::posix_time::ptime _lastAccess; std::unique_ptr _block; const CacheEntry *_nextEntry; + static boost::posix_time::ptime currentTime() { + return boost::posix_time::microsec_clock::local_time(); + } + DISALLOW_COPY_AND_ASSIGN(CacheEntry); }; diff --git a/implementations/caching/PeriodicTask.cpp b/implementations/caching/PeriodicTask.cpp new file mode 100644 index 00000000..1b260530 --- /dev/null +++ b/implementations/caching/PeriodicTask.cpp @@ -0,0 +1,32 @@ +#include "PeriodicTask.h" + +using std::function; +using std::cerr; +using std::endl; + +namespace blockstore { +namespace caching { + +PeriodicTask::PeriodicTask(function task, double intervalSec) : _thread(), _task(task), _intervalSec(intervalSec) { + _thread = boost::thread([this]() { + boost::chrono::nanoseconds interval((uint64_t)(UINT64_C(1000000000) * _intervalSec)); + try { + while(true) { + boost::this_thread::sleep_for(interval); + _task(); + } + } catch (const boost::thread_interrupted &e) { + //Do nothing, exit thread. + } catch (...) { + cerr << "PeriodicTask crashed" << endl; + } + }); +} + +PeriodicTask::~PeriodicTask() { + _thread.interrupt(); + _thread.join(); +} + +} +} diff --git a/implementations/caching/PeriodicTask.h b/implementations/caching/PeriodicTask.h new file mode 100644 index 00000000..4ad221e2 --- /dev/null +++ b/implementations/caching/PeriodicTask.h @@ -0,0 +1,25 @@ +#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_PERIODICTASK_H_ +#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_PERIODICTASK_H_ + +#include +#include + +namespace blockstore { +namespace caching { + +//TODO Test cases +class PeriodicTask { +public: + PeriodicTask(std::function task, double intervalSec); + virtual ~PeriodicTask(); + +private: + boost::thread _thread; + std::function _task; + double _intervalSec; +}; + +} +} + +#endif diff --git a/implementations/caching/QueueMap.h b/implementations/caching/QueueMap.h index 523dada6..758b2bd4 100644 --- a/implementations/caching/QueueMap.h +++ b/implementations/caching/QueueMap.h @@ -41,6 +41,10 @@ public: return pop(*_sentinel.next->key); } + const Value &peek() { + return *_sentinel.next->value; + } + uint32_t size() { return _entries.size(); }