Merged blockstore
This commit is contained in:
commit
f3d614c633
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,3 +1,4 @@
|
||||
/build
|
||||
/cmake
|
||||
/.idea
|
||||
|
||||
|
@ -35,5 +35,6 @@ script:
|
||||
- ./test/cpp-utils/cpp-utils-test
|
||||
- ./run_with_fuse.sh ./test/fspp/fspp-test
|
||||
- ./test/parallelaccessstore/parallelaccessstore-test
|
||||
- ./test/blockstore/blockstore-test
|
||||
after_script:
|
||||
- rm run_with_fuse.sh
|
||||
|
@ -1,4 +1,5 @@
|
||||
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
||||
add_subdirectory(cpp-utils)
|
||||
add_subdirectory(fspp)
|
||||
add_subdirectory(fspp)
|
||||
add_subdirectory(blockstore)
|
41
src/blockstore/CMakeLists.txt
Normal file
41
src/blockstore/CMakeLists.txt
Normal file
@ -0,0 +1,41 @@
|
||||
project (blockstore)
|
||||
|
||||
set(SOURCES
|
||||
utils/Key.cpp
|
||||
utils/BlockStoreUtils.cpp
|
||||
utils/FileDoesntExistException.cpp
|
||||
interface/helpers/BlockStoreWithRandomKeys.cpp
|
||||
implementations/testfake/FakeBlockStore.cpp
|
||||
implementations/testfake/FakeBlock.cpp
|
||||
implementations/inmemory/InMemoryBlock.cpp
|
||||
implementations/inmemory/InMemoryBlockStore.cpp
|
||||
implementations/parallelaccess/ParallelAccessBlockStore.cpp
|
||||
implementations/parallelaccess/BlockRef.cpp
|
||||
implementations/parallelaccess/ParallelAccessBlockStoreAdapter.cpp
|
||||
implementations/compressing/CompressingBlockStore.cpp
|
||||
implementations/compressing/CompressedBlock.cpp
|
||||
implementations/compressing/compressors/RunLengthEncoding.cpp
|
||||
implementations/compressing/compressors/Gzip.cpp
|
||||
implementations/encrypted/EncryptedBlockStore.cpp
|
||||
implementations/encrypted/EncryptedBlock.cpp
|
||||
implementations/ondisk/OnDiskBlockStore.cpp
|
||||
implementations/ondisk/OnDiskBlock.cpp
|
||||
implementations/caching/CachingBlockStore.cpp
|
||||
implementations/caching/cache/PeriodicTask.cpp
|
||||
implementations/caching/cache/CacheEntry.cpp
|
||||
implementations/caching/cache/Cache.cpp
|
||||
implementations/caching/cache/QueueMap.cpp
|
||||
implementations/caching/CachedBlock.cpp
|
||||
implementations/caching/NewBlock.cpp
|
||||
)
|
||||
|
||||
add_library(${PROJECT_NAME} STATIC ${SOURCES})
|
||||
|
||||
# This is needed by boost thread
|
||||
if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
|
||||
target_link_libraries(${PROJECT_NAME} PRIVATE rt)
|
||||
endif(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
|
||||
|
||||
target_add_boost(${PROJECT_NAME} filesystem system thread)
|
||||
target_enable_style_warnings(${PROJECT_NAME})
|
||||
target_activate_cpp14(${PROJECT_NAME})
|
46
src/blockstore/implementations/caching/CachedBlock.cpp
Normal file
46
src/blockstore/implementations/caching/CachedBlock.cpp
Normal file
@ -0,0 +1,46 @@
|
||||
#include "CachedBlock.h"
|
||||
#include "CachingBlockStore.h"
|
||||
|
||||
using cpputils::unique_ref;
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
CachedBlock::CachedBlock(unique_ref<Block> baseBlock, CachingBlockStore *blockStore)
|
||||
:Block(baseBlock->key()),
|
||||
_blockStore(blockStore),
|
||||
_baseBlock(std::move(baseBlock)) {
|
||||
}
|
||||
|
||||
CachedBlock::~CachedBlock() {
|
||||
if (_baseBlock.get() != nullptr) {
|
||||
_blockStore->release(std::move(_baseBlock));
|
||||
}
|
||||
}
|
||||
|
||||
const void *CachedBlock::data() const {
|
||||
return _baseBlock->data();
|
||||
}
|
||||
|
||||
void CachedBlock::write(const void *source, uint64_t offset, uint64_t size) {
|
||||
return _baseBlock->write(source, offset, size);
|
||||
}
|
||||
|
||||
void CachedBlock::flush() {
|
||||
return _baseBlock->flush();
|
||||
}
|
||||
|
||||
size_t CachedBlock::size() const {
|
||||
return _baseBlock->size();
|
||||
}
|
||||
|
||||
void CachedBlock::resize(size_t newSize) {
|
||||
return _baseBlock->resize(newSize);
|
||||
}
|
||||
|
||||
unique_ref<Block> CachedBlock::releaseBlock() {
|
||||
return std::move(_baseBlock);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
39
src/blockstore/implementations/caching/CachedBlock.h
Normal file
39
src/blockstore/implementations/caching/CachedBlock.h
Normal file
@ -0,0 +1,39 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHEDBLOCK_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHEDBLOCK_H_
|
||||
|
||||
#include "../../interface/Block.h"
|
||||
|
||||
#include <messmer/cpp-utils/pointer/unique_ref.h>
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
class CachingBlockStore;
|
||||
|
||||
class CachedBlock final: public Block {
|
||||
public:
|
||||
//TODO Storing key twice (in parent class and in object pointed to). Once would be enough.
|
||||
CachedBlock(cpputils::unique_ref<Block> baseBlock, CachingBlockStore *blockStore);
|
||||
~CachedBlock();
|
||||
|
||||
const void *data() const override;
|
||||
void write(const void *source, uint64_t offset, uint64_t size) override;
|
||||
void flush() override;
|
||||
|
||||
size_t size() const override;
|
||||
|
||||
void resize(size_t newSize) override;
|
||||
|
||||
cpputils::unique_ref<Block> releaseBlock();
|
||||
|
||||
private:
|
||||
CachingBlockStore *_blockStore;
|
||||
cpputils::unique_ref<Block> _baseBlock;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(CachedBlock);
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
92
src/blockstore/implementations/caching/CachingBlockStore.cpp
Normal file
92
src/blockstore/implementations/caching/CachingBlockStore.cpp
Normal file
@ -0,0 +1,92 @@
|
||||
#include "CachedBlock.h"
|
||||
#include "NewBlock.h"
|
||||
#include "CachingBlockStore.h"
|
||||
#include "../../interface/Block.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <messmer/cpp-utils/pointer/cast.h>
|
||||
#include <messmer/cpp-utils/assert/assert.h>
|
||||
|
||||
using cpputils::dynamic_pointer_move;
|
||||
using cpputils::Data;
|
||||
using boost::optional;
|
||||
using cpputils::unique_ref;
|
||||
using cpputils::make_unique_ref;
|
||||
using boost::none;
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
CachingBlockStore::CachingBlockStore(cpputils::unique_ref<BlockStore> baseBlockStore)
|
||||
:_baseBlockStore(std::move(baseBlockStore)), _cache(), _numNewBlocks(0) {
|
||||
}
|
||||
|
||||
Key CachingBlockStore::createKey() {
|
||||
return _baseBlockStore->createKey();
|
||||
}
|
||||
|
||||
optional<unique_ref<Block>> CachingBlockStore::tryCreate(const Key &key, Data data) {
|
||||
ASSERT(_cache.pop(key) == none, "Key already exists in cache");
|
||||
//TODO Shouldn't we return boost::none if the key already exists?
|
||||
//TODO Key can also already exist but not be in the cache right now.
|
||||
++_numNewBlocks;
|
||||
return unique_ref<Block>(make_unique_ref<CachedBlock>(make_unique_ref<NewBlock>(key, std::move(data), this), this));
|
||||
}
|
||||
|
||||
optional<unique_ref<Block>> CachingBlockStore::load(const Key &key) {
|
||||
optional<unique_ref<Block>> optBlock = _cache.pop(key);
|
||||
//TODO an optional<> class with .getOrElse() would make this code simpler. boost::optional<>::value_or_eval didn't seem to work with unique_ptr members.
|
||||
if (optBlock != none) {
|
||||
return optional<unique_ref<Block>>(make_unique_ref<CachedBlock>(std::move(*optBlock), this));
|
||||
} else {
|
||||
auto block = _baseBlockStore->load(key);
|
||||
if (block == none) {
|
||||
return none;
|
||||
} else {
|
||||
return optional<unique_ref<Block>>(make_unique_ref<CachedBlock>(std::move(*block), this));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CachingBlockStore::remove(cpputils::unique_ref<Block> block) {
|
||||
auto cached_block = dynamic_pointer_move<CachedBlock>(block);
|
||||
ASSERT(cached_block != none, "Passed block is not a CachedBlock");
|
||||
auto baseBlock = (*cached_block)->releaseBlock();
|
||||
auto baseNewBlock = dynamic_pointer_move<NewBlock>(baseBlock);
|
||||
if (baseNewBlock != none) {
|
||||
if(!(*baseNewBlock)->alreadyExistsInBaseStore()) {
|
||||
--_numNewBlocks;
|
||||
}
|
||||
(*baseNewBlock)->remove();
|
||||
} else {
|
||||
_baseBlockStore->remove(std::move(baseBlock));
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t CachingBlockStore::numBlocks() const {
|
||||
return _baseBlockStore->numBlocks() + _numNewBlocks;
|
||||
}
|
||||
|
||||
void CachingBlockStore::release(unique_ref<Block> block) {
|
||||
Key key = block->key();
|
||||
_cache.push(key, std::move(block));
|
||||
}
|
||||
|
||||
optional<unique_ref<Block>> CachingBlockStore::tryCreateInBaseStore(const Key &key, Data data) {
|
||||
auto block = _baseBlockStore->tryCreate(key, std::move(data));
|
||||
if (block != none) {
|
||||
--_numNewBlocks;
|
||||
}
|
||||
return block;
|
||||
}
|
||||
|
||||
void CachingBlockStore::removeFromBaseStore(cpputils::unique_ref<Block> block) {
|
||||
_baseBlockStore->remove(std::move(block));
|
||||
}
|
||||
|
||||
void CachingBlockStore::flush() {
|
||||
_cache.flush();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
40
src/blockstore/implementations/caching/CachingBlockStore.h
Normal file
40
src/blockstore/implementations/caching/CachingBlockStore.h
Normal file
@ -0,0 +1,40 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHINGBLOCKSTORE_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHINGBLOCKSTORE_H_
|
||||
|
||||
#include "cache/Cache.h"
|
||||
#include "../../interface/BlockStore.h"
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
//TODO Check that this blockstore allows parallel destructing of blocks (otherwise we won't encrypt blocks in parallel)
|
||||
class CachingBlockStore final: public BlockStore {
|
||||
public:
|
||||
explicit CachingBlockStore(cpputils::unique_ref<BlockStore> baseBlockStore);
|
||||
|
||||
Key createKey() override;
|
||||
boost::optional<cpputils::unique_ref<Block>> tryCreate(const Key &key, cpputils::Data data) override;
|
||||
boost::optional<cpputils::unique_ref<Block>> load(const Key &key) override;
|
||||
void remove(cpputils::unique_ref<Block> block) override;
|
||||
uint64_t numBlocks() const override;
|
||||
|
||||
void release(cpputils::unique_ref<Block> block);
|
||||
|
||||
boost::optional<cpputils::unique_ref<Block>> tryCreateInBaseStore(const Key &key, cpputils::Data data);
|
||||
void removeFromBaseStore(cpputils::unique_ref<Block> block);
|
||||
|
||||
void flush();
|
||||
|
||||
private:
|
||||
cpputils::unique_ref<BlockStore> _baseBlockStore;
|
||||
Cache<Key, cpputils::unique_ref<Block>, 1000> _cache;
|
||||
uint32_t _numNewBlocks;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(CachingBlockStore);
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
75
src/blockstore/implementations/caching/NewBlock.cpp
Normal file
75
src/blockstore/implementations/caching/NewBlock.cpp
Normal file
@ -0,0 +1,75 @@
|
||||
#include "NewBlock.h"
|
||||
#include "CachingBlockStore.h"
|
||||
#include <messmer/cpp-utils/assert/assert.h>
|
||||
#include <messmer/cpp-utils/data/DataUtils.h>
|
||||
|
||||
using cpputils::Data;
|
||||
using boost::none;
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
NewBlock::NewBlock(const Key &key, Data data, CachingBlockStore *blockStore)
|
||||
:Block(key),
|
||||
_blockStore(blockStore),
|
||||
_data(std::move(data)),
|
||||
_baseBlock(none),
|
||||
_dataChanged(true) {
|
||||
}
|
||||
|
||||
NewBlock::~NewBlock() {
|
||||
writeToBaseBlockIfChanged();
|
||||
}
|
||||
|
||||
const void *NewBlock::data() const {
|
||||
return _data.data();
|
||||
}
|
||||
|
||||
void NewBlock::write(const void *source, uint64_t offset, uint64_t size) {
|
||||
ASSERT(offset <= _data.size() && offset + size <= _data.size(), "Write outside of valid area");
|
||||
std::memcpy((uint8_t*)_data.data()+offset, source, size);
|
||||
_dataChanged = true;
|
||||
}
|
||||
|
||||
void NewBlock::writeToBaseBlockIfChanged() {
|
||||
if (_dataChanged) {
|
||||
if (_baseBlock == none) {
|
||||
//TODO _data.copy() necessary?
|
||||
auto newBase = _blockStore->tryCreateInBaseStore(key(), _data.copy());
|
||||
ASSERT(newBase != boost::none, "Couldn't create base block"); //TODO What if tryCreate fails due to a duplicate key? We should ensure we don't use duplicate keys.
|
||||
_baseBlock = std::move(*newBase);
|
||||
} else {
|
||||
(*_baseBlock)->write(_data.data(), 0, _data.size());
|
||||
}
|
||||
_dataChanged = false;
|
||||
}
|
||||
}
|
||||
|
||||
void NewBlock::remove() {
|
||||
if (_baseBlock != none) {
|
||||
_blockStore->removeFromBaseStore(std::move(*_baseBlock));
|
||||
}
|
||||
_dataChanged = false;
|
||||
}
|
||||
|
||||
void NewBlock::flush() {
|
||||
writeToBaseBlockIfChanged();
|
||||
ASSERT(_baseBlock != none, "At this point, the base block should already have been created but wasn't");
|
||||
(*_baseBlock)->flush();
|
||||
}
|
||||
|
||||
size_t NewBlock::size() const {
|
||||
return _data.size();
|
||||
}
|
||||
|
||||
void NewBlock::resize(size_t newSize) {
|
||||
_data = cpputils::DataUtils::resize(std::move(_data), newSize);
|
||||
_dataChanged = true;
|
||||
}
|
||||
|
||||
bool NewBlock::alreadyExistsInBaseStore() const {
|
||||
return _baseBlock != none;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
52
src/blockstore/implementations/caching/NewBlock.h
Normal file
52
src/blockstore/implementations/caching/NewBlock.h
Normal file
@ -0,0 +1,52 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_NEWBLOCK_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_NEWBLOCK_H_
|
||||
|
||||
#include "../../interface/BlockStore.h"
|
||||
#include <messmer/cpp-utils/data/Data.h>
|
||||
|
||||
#include "messmer/cpp-utils/macros.h"
|
||||
#include <memory>
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
class CachingBlockStore;
|
||||
|
||||
//TODO Does it make sense to write a general DataBackedBlock that just stores a Data object and maps the block operations to it?
|
||||
// Can we reuse that object somewhere else?
|
||||
// Maybe a second abstract class for BlockRefBackedBlock?
|
||||
|
||||
// This is a block that was created in CachingBlockStore, but doesn't exist in the base block store yet.
|
||||
// It only exists in the cache and it is created in the base block store when destructed.
|
||||
class NewBlock final: public Block {
|
||||
public:
|
||||
NewBlock(const Key &key, cpputils::Data data, CachingBlockStore *blockStore);
|
||||
~NewBlock();
|
||||
|
||||
const void *data() const override;
|
||||
void write(const void *source, uint64_t offset, uint64_t size) override;
|
||||
void flush() override;
|
||||
|
||||
size_t size() const override;
|
||||
|
||||
void resize(size_t newSize) override;
|
||||
|
||||
void remove();
|
||||
|
||||
bool alreadyExistsInBaseStore() const;
|
||||
|
||||
private:
|
||||
CachingBlockStore *_blockStore;
|
||||
cpputils::Data _data;
|
||||
boost::optional<cpputils::unique_ref<Block>> _baseBlock;
|
||||
bool _dataChanged;
|
||||
|
||||
void writeToBaseBlockIfChanged();
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(NewBlock);
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
1
src/blockstore/implementations/caching/cache/Cache.cpp
vendored
Normal file
1
src/blockstore/implementations/caching/cache/Cache.cpp
vendored
Normal file
@ -0,0 +1 @@
|
||||
#include "Cache.h"
|
178
src/blockstore/implementations/caching/cache/Cache.h
vendored
Normal file
178
src/blockstore/implementations/caching/cache/Cache.h
vendored
Normal file
@ -0,0 +1,178 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHE_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHE_H_
|
||||
|
||||
#include "CacheEntry.h"
|
||||
#include "QueueMap.h"
|
||||
#include "PeriodicTask.h"
|
||||
#include <memory>
|
||||
#include <boost/optional.hpp>
|
||||
#include <future>
|
||||
#include <messmer/cpp-utils/assert/assert.h>
|
||||
#include <messmer/cpp-utils/lock/MutexPoolLock.h>
|
||||
#include <messmer/cpp-utils/pointer/gcc_4_8_compatibility.h>
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
class Cache final {
|
||||
public:
|
||||
//TODO Current MAX_LIFETIME_SEC only considers time since the element was last pushed to the Cache. Also insert a real MAX_LIFETIME_SEC that forces resync of entries that have been pushed/popped often (e.g. the root blob)
|
||||
//TODO Experiment with good values
|
||||
static constexpr double PURGE_LIFETIME_SEC = 0.5; //When an entry has this age, it will be purged from the cache
|
||||
static constexpr double PURGE_INTERVAL = 0.5; // With this interval, we check for entries to purge
|
||||
static constexpr double MAX_LIFETIME_SEC = PURGE_LIFETIME_SEC + PURGE_INTERVAL; // This is the oldest age an entry can reach (given purging works in an ideal world, i.e. with the ideal interval and in zero time)
|
||||
|
||||
Cache();
|
||||
~Cache();
|
||||
|
||||
uint32_t size() const;
|
||||
|
||||
void push(const Key &key, Value value);
|
||||
boost::optional<Value> pop(const Key &key);
|
||||
|
||||
void flush();
|
||||
|
||||
private:
|
||||
void _makeSpaceForEntry(std::unique_lock<std::mutex> *lock);
|
||||
void _deleteEntry(std::unique_lock<std::mutex> *lock);
|
||||
void _deleteOldEntriesParallel();
|
||||
void _deleteAllEntriesParallel();
|
||||
void _deleteMatchingEntriesAtBeginningParallel(std::function<bool (const CacheEntry<Key, Value> &)> matches);
|
||||
void _deleteMatchingEntriesAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches);
|
||||
bool _deleteMatchingEntryAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches);
|
||||
|
||||
mutable std::mutex _mutex;
|
||||
cpputils::LockPool<Key> _currentlyFlushingEntries;
|
||||
QueueMap<Key, CacheEntry<Key, Value>> _cachedBlocks;
|
||||
std::unique_ptr<PeriodicTask> _timeoutFlusher;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(Cache);
|
||||
};
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES> constexpr double Cache<Key, Value, MAX_ENTRIES>::PURGE_LIFETIME_SEC;
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES> constexpr double Cache<Key, Value, MAX_ENTRIES>::PURGE_INTERVAL;
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES> constexpr double Cache<Key, Value, MAX_ENTRIES>::MAX_LIFETIME_SEC;
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
Cache<Key, Value, MAX_ENTRIES>::Cache(): _mutex(), _currentlyFlushingEntries(), _cachedBlocks(), _timeoutFlusher(nullptr) {
|
||||
//Don't initialize timeoutFlusher in the initializer list,
|
||||
//because it then might already call Cache::popOldEntries() before Cache is done constructing.
|
||||
_timeoutFlusher = std::make_unique<PeriodicTask>(std::bind(&Cache::_deleteOldEntriesParallel, this), PURGE_INTERVAL);
|
||||
}
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
Cache<Key, Value, MAX_ENTRIES>::~Cache() {
|
||||
_deleteAllEntriesParallel();
|
||||
ASSERT(_cachedBlocks.size() == 0, "Error in _deleteAllEntriesParallel()");
|
||||
}
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
boost::optional<Value> Cache<Key, Value, MAX_ENTRIES>::pop(const Key &key) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
cpputils::MutexPoolLock<Key> lockEntryFromBeingPopped(&_currentlyFlushingEntries, key, &lock);
|
||||
|
||||
auto found = _cachedBlocks.pop(key);
|
||||
if (!found) {
|
||||
return boost::none;
|
||||
}
|
||||
return found->releaseValue();
|
||||
}
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::push(const Key &key, Value value) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
ASSERT(_cachedBlocks.size() <= MAX_ENTRIES, "Cache too full");
|
||||
_makeSpaceForEntry(&lock);
|
||||
_cachedBlocks.push(key, CacheEntry<Key, Value>(std::move(value)));
|
||||
}
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::_makeSpaceForEntry(std::unique_lock<std::mutex> *lock) {
|
||||
// _deleteEntry releases the lock while the Value destructor is running.
|
||||
// So we can destruct multiple entries in parallel and also call pop() or push() while doing so.
|
||||
// However, if another thread calls push() before we get the lock back, the cache is full again.
|
||||
// That's why we need the while() loop here.
|
||||
while (_cachedBlocks.size() == MAX_ENTRIES) {
|
||||
_deleteEntry(lock);
|
||||
}
|
||||
ASSERT(_cachedBlocks.size() < MAX_ENTRIES, "Removing entry from cache didn't work");
|
||||
};
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::_deleteEntry(std::unique_lock<std::mutex> *lock) {
|
||||
auto key = _cachedBlocks.peekKey();
|
||||
ASSERT(key != boost::none, "There was no entry to delete");
|
||||
cpputils::MutexPoolLock<Key> lockEntryFromBeingPopped(&_currentlyFlushingEntries, *key);
|
||||
auto value = _cachedBlocks.pop();
|
||||
// Call destructor outside of the unique_lock,
|
||||
// i.e. pop() and push() can be called here, except for pop() on the element in _currentlyFlushingEntries
|
||||
lock->unlock();
|
||||
value = boost::none; // Call destructor
|
||||
lock->lock();
|
||||
};
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::_deleteAllEntriesParallel() {
|
||||
return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry<Key, Value> &) {
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::_deleteOldEntriesParallel() {
|
||||
return _deleteMatchingEntriesAtBeginningParallel([] (const CacheEntry<Key, Value> &entry) {
|
||||
return entry.ageSeconds() > PURGE_LIFETIME_SEC;
|
||||
});
|
||||
}
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::_deleteMatchingEntriesAtBeginningParallel(std::function<bool (const CacheEntry<Key, Value> &)> matches) {
|
||||
// Twice the number of cores, so we use full CPU even if half the threads are doing I/O
|
||||
unsigned int numThreads = 2 * std::max(1u, std::thread::hardware_concurrency());
|
||||
std::vector<std::future<void>> waitHandles;
|
||||
for (unsigned int i = 0; i < numThreads; ++i) {
|
||||
waitHandles.push_back(std::async(std::launch::async, [this, matches] {
|
||||
_deleteMatchingEntriesAtBeginning(matches);
|
||||
}));
|
||||
}
|
||||
for (auto & waitHandle : waitHandles) {
|
||||
waitHandle.wait();
|
||||
}
|
||||
};
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::_deleteMatchingEntriesAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches) {
|
||||
while (_deleteMatchingEntryAtBeginning(matches)) {}
|
||||
}
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
bool Cache<Key, Value, MAX_ENTRIES>::_deleteMatchingEntryAtBeginning(std::function<bool (const CacheEntry<Key, Value> &)> matches) {
|
||||
// This function can be called in parallel by multiple threads and will then cause the Value destructors
|
||||
// to be called in parallel. The call to _deleteEntry() releases the lock while the Value destructor is running.
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
if (_cachedBlocks.size() > 0 && matches(*_cachedBlocks.peek())) {
|
||||
_deleteEntry(&lock);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
uint32_t Cache<Key, Value, MAX_ENTRIES>::size() const {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
return _cachedBlocks.size();
|
||||
};
|
||||
|
||||
template<class Key, class Value, uint32_t MAX_ENTRIES>
|
||||
void Cache<Key, Value, MAX_ENTRIES>::flush() {
|
||||
//TODO Test flush()
|
||||
return _deleteAllEntriesParallel();
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
1
src/blockstore/implementations/caching/cache/CacheEntry.cpp
vendored
Normal file
1
src/blockstore/implementations/caching/cache/CacheEntry.cpp
vendored
Normal file
@ -0,0 +1 @@
|
||||
#include "CacheEntry.h"
|
44
src/blockstore/implementations/caching/cache/CacheEntry.h
vendored
Normal file
44
src/blockstore/implementations/caching/cache/CacheEntry.h
vendored
Normal file
@ -0,0 +1,44 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHEENTRY_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_CACHEENTRY_H_
|
||||
|
||||
#include <ctime>
|
||||
#include <memory>
|
||||
#include <messmer/cpp-utils/macros.h>
|
||||
#include <boost/date_time/posix_time/posix_time_types.hpp>
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
template<class Key, class Value>
|
||||
class CacheEntry final {
|
||||
public:
|
||||
explicit CacheEntry(Value value): _lastAccess(currentTime()), _value(std::move(value)) {
|
||||
}
|
||||
|
||||
CacheEntry(CacheEntry &&) = default;
|
||||
|
||||
double ageSeconds() const {
|
||||
return ((double)(currentTime() - _lastAccess).total_nanoseconds()) / ((double)1000000000);
|
||||
}
|
||||
|
||||
Value releaseValue() {
|
||||
return std::move(_value);
|
||||
}
|
||||
|
||||
private:
|
||||
boost::posix_time::ptime _lastAccess;
|
||||
Value _value;
|
||||
|
||||
static boost::posix_time::ptime currentTime() {
|
||||
return boost::posix_time::microsec_clock::local_time();
|
||||
}
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(CacheEntry);
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif
|
27
src/blockstore/implementations/caching/cache/PeriodicTask.cpp
vendored
Normal file
27
src/blockstore/implementations/caching/cache/PeriodicTask.cpp
vendored
Normal file
@ -0,0 +1,27 @@
|
||||
#include "PeriodicTask.h"
|
||||
#include <messmer/cpp-utils/logging/logging.h>
|
||||
|
||||
using std::function;
|
||||
using std::endl;
|
||||
using namespace cpputils::logging;
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
PeriodicTask::PeriodicTask(function<void ()> task, double intervalSec) :
|
||||
_task(task),
|
||||
_interval((uint64_t)(UINT64_C(1000000000) * intervalSec)),
|
||||
_thread(std::bind(&PeriodicTask::_loopIteration, this)) {
|
||||
_thread.start();
|
||||
}
|
||||
|
||||
bool PeriodicTask::_loopIteration() {
|
||||
//Has to be boost::this_thread::sleep_for and not std::this_thread::sleep_for, because it has to be interruptible.
|
||||
//LoopThread will interrupt this method if it has to be restarted.
|
||||
boost::this_thread::sleep_for(_interval);
|
||||
_task();
|
||||
return true; // Run another iteration (don't terminate thread)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
32
src/blockstore/implementations/caching/cache/PeriodicTask.h
vendored
Normal file
32
src/blockstore/implementations/caching/cache/PeriodicTask.h
vendored
Normal file
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_PERIODICTASK_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_PERIODICTASK_H_
|
||||
|
||||
#include <functional>
|
||||
#include <messmer/cpp-utils/thread/LoopThread.h>
|
||||
#include <boost/chrono.hpp>
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
class PeriodicTask final {
|
||||
public:
|
||||
PeriodicTask(std::function<void ()> task, double intervalSec);
|
||||
|
||||
private:
|
||||
bool _loopIteration();
|
||||
|
||||
std::function<void()> _task;
|
||||
boost::chrono::nanoseconds _interval;
|
||||
|
||||
//This member has to be last, so the thread is destructed first. Otherwise the thread might access elements from a
|
||||
//partly destructed PeriodicTask.
|
||||
cpputils::LoopThread _thread;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(PeriodicTask);
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
1
src/blockstore/implementations/caching/cache/QueueMap.cpp
vendored
Normal file
1
src/blockstore/implementations/caching/cache/QueueMap.cpp
vendored
Normal file
@ -0,0 +1 @@
|
||||
#include "QueueMap.h"
|
121
src/blockstore/implementations/caching/cache/QueueMap.h
vendored
Normal file
121
src/blockstore/implementations/caching/cache/QueueMap.h
vendored
Normal file
@ -0,0 +1,121 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_QUEUEMAP_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_CACHING_CACHE_QUEUEMAP_H_
|
||||
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <cassert>
|
||||
#include <boost/optional.hpp>
|
||||
#include <messmer/cpp-utils/macros.h>
|
||||
#include <messmer/cpp-utils/assert/assert.h>
|
||||
|
||||
namespace blockstore {
|
||||
namespace caching {
|
||||
|
||||
//TODO FreeList for performance (malloc is expensive)
|
||||
//TODO Single linked list with pointer to last element (for insertion) should be enough for a queue. No double linked list needed.
|
||||
// But then, popping arbitrary elements needs to be rewritten so that _removeFromQueue() is _removeSuccessorFromQueue()
|
||||
// and the map doesn't store the element itself, but its predecessor. That is, popping might be a bit slower. Test with experiments!
|
||||
|
||||
// A class that is a queue and a map at the same time. We could also see it as an addressable queue.
|
||||
template<class Key, class Value>
|
||||
class QueueMap final {
|
||||
public:
|
||||
QueueMap(): _entries(), _sentinel(&_sentinel, &_sentinel) {
|
||||
}
|
||||
~QueueMap() {
|
||||
for (auto &entry : _entries) {
|
||||
entry.second.release();
|
||||
}
|
||||
}
|
||||
|
||||
void push(const Key &key, Value value) {
|
||||
auto newEntry = _entries.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(_sentinel.prev, &_sentinel));
|
||||
if (!newEntry.second) {
|
||||
throw std::logic_error("There is already an element with this key");
|
||||
}
|
||||
newEntry.first->second.init(&newEntry.first->first, std::move(value));
|
||||
//The following is ok, because std::unordered_map never invalidates pointers to its entries
|
||||
_sentinel.prev->next = &newEntry.first->second;
|
||||
_sentinel.prev = &newEntry.first->second;
|
||||
}
|
||||
|
||||
boost::optional<Value> pop(const Key &key) {
|
||||
auto found = _entries.find(key);
|
||||
if (found == _entries.end()) {
|
||||
return boost::none;
|
||||
}
|
||||
_removeFromQueue(found->second);
|
||||
auto value = found->second.release();
|
||||
_entries.erase(found);
|
||||
return std::move(value);
|
||||
}
|
||||
|
||||
boost::optional<Value> pop() {
|
||||
if(_sentinel.next == &_sentinel) {
|
||||
return boost::none;
|
||||
}
|
||||
return pop(*_sentinel.next->key);
|
||||
}
|
||||
|
||||
boost::optional<const Key &> peekKey() {
|
||||
if(_sentinel.next == &_sentinel) {
|
||||
return boost::none;
|
||||
}
|
||||
return *_sentinel.next->key;
|
||||
}
|
||||
|
||||
boost::optional<const Value &> peek() {
|
||||
if(_sentinel.next == &_sentinel) {
|
||||
return boost::none;
|
||||
}
|
||||
return _sentinel.next->value();
|
||||
}
|
||||
|
||||
uint32_t size() const {
|
||||
return _entries.size();
|
||||
}
|
||||
|
||||
private:
|
||||
class Entry final {
|
||||
public:
|
||||
Entry(Entry *prev_, Entry *next_): prev(prev_), next(next_), key(nullptr), __value() {
|
||||
}
|
||||
void init(const Key *key_, Value value_) {
|
||||
key = key_;
|
||||
new(__value) Value(std::move(value_));
|
||||
}
|
||||
Value release() {
|
||||
Value value = std::move(*_value());
|
||||
_value()->~Value();
|
||||
return value;
|
||||
}
|
||||
const Value &value() {
|
||||
return *_value();
|
||||
}
|
||||
Entry *prev;
|
||||
Entry *next;
|
||||
const Key *key;
|
||||
private:
|
||||
Value *_value() {
|
||||
return reinterpret_cast<Value*>(__value);
|
||||
}
|
||||
alignas(Value) char __value[sizeof(Value)];
|
||||
DISALLOW_COPY_AND_ASSIGN(Entry);
|
||||
};
|
||||
|
||||
void _removeFromQueue(const Entry &entry) {
|
||||
entry.prev->next = entry.next;
|
||||
entry.next->prev = entry.prev;
|
||||
}
|
||||
|
||||
std::unordered_map<Key, Entry> _entries;
|
||||
Entry _sentinel;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(QueueMap);
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
@ -0,0 +1 @@
|
||||
#include "CompressedBlock.h"
|
127
src/blockstore/implementations/compressing/CompressedBlock.h
Normal file
127
src/blockstore/implementations/compressing/CompressedBlock.h
Normal file
@ -0,0 +1,127 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSEDBLOCK_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSEDBLOCK_H_
|
||||
|
||||
#include "../../interface/Block.h"
|
||||
#include "../../interface/BlockStore.h"
|
||||
#include <messmer/cpp-utils/data/DataUtils.h>
|
||||
#include <messmer/cpp-utils/pointer/unique_ref.h>
|
||||
#include <mutex>
|
||||
|
||||
namespace blockstore {
|
||||
class BlockStore;
|
||||
namespace compressing {
|
||||
template<class Compressor> class CompressingBlockStore;
|
||||
|
||||
template<class Compressor>
|
||||
class CompressedBlock final: public Block {
|
||||
public:
|
||||
static boost::optional<cpputils::unique_ref<CompressedBlock>> TryCreateNew(BlockStore *baseBlockStore, const Key &key, cpputils::Data decompressedData);
|
||||
static cpputils::unique_ref<CompressedBlock> Decompress(cpputils::unique_ref<Block> baseBlock);
|
||||
|
||||
CompressedBlock(cpputils::unique_ref<Block> baseBlock, cpputils::Data decompressedData);
|
||||
~CompressedBlock();
|
||||
|
||||
const void *data() const override;
|
||||
void write(const void *source, uint64_t offset, uint64_t size) override;
|
||||
|
||||
void flush() override;
|
||||
|
||||
size_t size() const override;
|
||||
void resize(size_t newSize) override;
|
||||
|
||||
cpputils::unique_ref<Block> releaseBaseBlock();
|
||||
|
||||
private:
|
||||
void _compressToBaseBlock();
|
||||
|
||||
cpputils::unique_ref<Block> _baseBlock;
|
||||
cpputils::Data _decompressedData;
|
||||
std::mutex _mutex;
|
||||
bool _dataChanged;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(CompressedBlock);
|
||||
};
|
||||
|
||||
template<class Compressor>
|
||||
boost::optional<cpputils::unique_ref<CompressedBlock<Compressor>>> CompressedBlock<Compressor>::TryCreateNew(BlockStore *baseBlockStore, const Key &key, cpputils::Data decompressedData) {
|
||||
cpputils::Data compressed = Compressor::Compress(decompressedData);
|
||||
auto baseBlock = baseBlockStore->tryCreate(key, std::move(compressed));
|
||||
if (baseBlock == boost::none) {
|
||||
//TODO Test this code branch
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
return cpputils::make_unique_ref<CompressedBlock<Compressor>>(std::move(*baseBlock), std::move(decompressedData));
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
cpputils::unique_ref<CompressedBlock<Compressor>> CompressedBlock<Compressor>::Decompress(cpputils::unique_ref<Block> baseBlock) {
|
||||
cpputils::Data decompressed = Compressor::Decompress((byte*)baseBlock->data(), baseBlock->size());
|
||||
return cpputils::make_unique_ref<CompressedBlock<Compressor>>(std::move(baseBlock), std::move(decompressed));
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
CompressedBlock<Compressor>::CompressedBlock(cpputils::unique_ref<Block> baseBlock, cpputils::Data decompressedData)
|
||||
: Block(baseBlock->key()),
|
||||
_baseBlock(std::move(baseBlock)),
|
||||
_decompressedData(std::move(decompressedData)),
|
||||
_dataChanged(false) {
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
CompressedBlock<Compressor>::~CompressedBlock() {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_compressToBaseBlock();
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
const void *CompressedBlock<Compressor>::data() const {
|
||||
return _decompressedData.data();
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
void CompressedBlock<Compressor>::write(const void *source, uint64_t offset, uint64_t size) {
|
||||
std::memcpy((uint8_t*)_decompressedData.dataOffset(offset), source, size);
|
||||
_dataChanged = true;
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
void CompressedBlock<Compressor>::flush() {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_compressToBaseBlock();
|
||||
return _baseBlock->flush();
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
size_t CompressedBlock<Compressor>::size() const {
|
||||
return _decompressedData.size();
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
void CompressedBlock<Compressor>::resize(size_t newSize) {
|
||||
_decompressedData = cpputils::DataUtils::resize(std::move(_decompressedData), newSize);
|
||||
_dataChanged = true;
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
cpputils::unique_ref<Block> CompressedBlock<Compressor>::releaseBaseBlock() {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_compressToBaseBlock();
|
||||
return std::move(_baseBlock);
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
void CompressedBlock<Compressor>::_compressToBaseBlock() {
|
||||
if (_dataChanged) {
|
||||
cpputils::Data compressed = Compressor::Compress(_decompressedData);
|
||||
_baseBlock->resize(compressed.size());
|
||||
_baseBlock->write(compressed.data(), 0, compressed.size());
|
||||
_dataChanged = false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
@ -0,0 +1 @@
|
||||
#include "CompressingBlockStore.h"
|
@ -0,0 +1,77 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSINGBLOCKSTORE_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSINGBLOCKSTORE_H_
|
||||
|
||||
#include "../../interface/BlockStore.h"
|
||||
#include "CompressedBlock.h"
|
||||
|
||||
namespace blockstore {
|
||||
namespace compressing {
|
||||
|
||||
template<class Compressor>
|
||||
class CompressingBlockStore final: public BlockStore {
|
||||
public:
|
||||
CompressingBlockStore(cpputils::unique_ref<BlockStore> baseBlockStore);
|
||||
~CompressingBlockStore();
|
||||
|
||||
Key createKey() override;
|
||||
boost::optional<cpputils::unique_ref<Block>> tryCreate(const Key &key, cpputils::Data data) override;
|
||||
boost::optional<cpputils::unique_ref<Block>> load(const Key &key) override;
|
||||
void remove(cpputils::unique_ref<Block> block) override;
|
||||
uint64_t numBlocks() const override;
|
||||
|
||||
private:
|
||||
cpputils::unique_ref<BlockStore> _baseBlockStore;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(CompressingBlockStore);
|
||||
};
|
||||
|
||||
template<class Compressor>
|
||||
CompressingBlockStore<Compressor>::CompressingBlockStore(cpputils::unique_ref<BlockStore> baseBlockStore)
|
||||
: _baseBlockStore(std::move(baseBlockStore)) {
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
CompressingBlockStore<Compressor>::~CompressingBlockStore() {
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
Key CompressingBlockStore<Compressor>::createKey() {
|
||||
return _baseBlockStore->createKey();
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
boost::optional<cpputils::unique_ref<Block>> CompressingBlockStore<Compressor>::tryCreate(const Key &key, cpputils::Data data) {
|
||||
auto result = CompressedBlock<Compressor>::TryCreateNew(_baseBlockStore.get(), key, std::move(data));
|
||||
if (result == boost::none) {
|
||||
return boost::none;
|
||||
}
|
||||
return cpputils::unique_ref<Block>(std::move(*result));
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
boost::optional<cpputils::unique_ref<Block>> CompressingBlockStore<Compressor>::load(const Key &key) {
|
||||
auto loaded = _baseBlockStore->load(key);
|
||||
if (loaded == boost::none) {
|
||||
return boost::none;
|
||||
}
|
||||
return boost::optional<cpputils::unique_ref<Block>>(CompressedBlock<Compressor>::Decompress(std::move(*loaded)));
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
void CompressingBlockStore<Compressor>::remove(cpputils::unique_ref<Block> block) {
|
||||
auto _block = cpputils::dynamic_pointer_move<CompressedBlock<Compressor>>(block);
|
||||
ASSERT(_block != boost::none, "Wrong block type");
|
||||
auto baseBlock = (*_block)->releaseBaseBlock();
|
||||
return _baseBlockStore->remove(std::move(baseBlock));
|
||||
}
|
||||
|
||||
template<class Compressor>
|
||||
uint64_t CompressingBlockStore<Compressor>::numBlocks() const {
|
||||
return _baseBlockStore->numBlocks();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
@ -0,0 +1,29 @@
|
||||
#include "Gzip.h"
|
||||
#include <cryptopp/cryptopp/gzip.h>
|
||||
|
||||
using cpputils::Data;
|
||||
|
||||
namespace blockstore {
|
||||
namespace compressing {
|
||||
|
||||
Data Gzip::Compress(const Data &data) {
|
||||
CryptoPP::Gzip zipper;
|
||||
zipper.Put((byte *) data.data(), data.size());
|
||||
zipper.MessageEnd();
|
||||
Data compressed(zipper.MaxRetrievable());
|
||||
zipper.Get((byte *) compressed.data(), compressed.size());
|
||||
return compressed;
|
||||
}
|
||||
|
||||
Data Gzip::Decompress(const void *data, size_t size) {
|
||||
//TODO Change interface to taking cpputils::Data objects (needs changing blockstore so we can read their "class Data", because this is called from CompressedBlock::Decompress()).
|
||||
CryptoPP::Gunzip zipper;
|
||||
zipper.Put((byte *) data, size);
|
||||
zipper.MessageEnd();
|
||||
Data decompressed(zipper.MaxRetrievable());
|
||||
zipper.Get((byte *) decompressed.data(), decompressed.size());
|
||||
return decompressed;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_GZIP_H
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_GZIP_H
|
||||
|
||||
#include <messmer/cpp-utils/data/Data.h>
|
||||
|
||||
namespace blockstore {
|
||||
namespace compressing {
|
||||
class Gzip {
|
||||
public:
|
||||
static cpputils::Data Compress(const cpputils::Data &data);
|
||||
|
||||
static cpputils::Data Decompress(const void *data, size_t size);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
@ -0,0 +1,139 @@
|
||||
#include "RunLengthEncoding.h"
|
||||
#include <sstream>
|
||||
#include <messmer/cpp-utils/assert/assert.h>
|
||||
|
||||
using cpputils::Data;
|
||||
using std::string;
|
||||
using std::ostringstream;
|
||||
using std::istringstream;
|
||||
|
||||
namespace blockstore {
|
||||
namespace compressing {
|
||||
|
||||
// Alternatively store a run of arbitrary bytes and a run of identical bytes.
|
||||
// Each run is preceded by its length. Length fields are uint16_t.
|
||||
// Example: 2 - 5 - 8 - 10 - 3 - 0 - 2 - 0
|
||||
// Length 2 arbitrary bytes (values: 5, 8), the next 10 bytes store "3" each,
|
||||
// then 0 arbitrary bytes and 2x "0".
|
||||
|
||||
Data RunLengthEncoding::Compress(const Data &data) {
|
||||
ostringstream compressed;
|
||||
uint8_t *current = (uint8_t*)data.data();
|
||||
uint8_t *end = (uint8_t*)data.data()+data.size();
|
||||
while (current < end) {
|
||||
_encodeArbitraryWords(¤t, end, &compressed);
|
||||
ASSERT(current <= end, "Overflow");
|
||||
if (current == end) {
|
||||
break;
|
||||
}
|
||||
_encodeIdenticalWords(¤t, end, &compressed);
|
||||
ASSERT(current <= end, "Overflow");
|
||||
}
|
||||
return _extractData(&compressed);
|
||||
}
|
||||
|
||||
void RunLengthEncoding::_encodeArbitraryWords(uint8_t **current, uint8_t* end, ostringstream *output) {
|
||||
uint16_t size = _arbitraryRunLength(*current, end);
|
||||
output->write((const char*)&size, sizeof(uint16_t));
|
||||
output->write((const char*)*current, size);
|
||||
*current += size;
|
||||
}
|
||||
|
||||
uint16_t RunLengthEncoding::_arbitraryRunLength(uint8_t *start, uint8_t* end) {
|
||||
// Each stopping of an arbitrary bytes run costs us 5 byte, because we have to store the length
|
||||
// for the identical bytes run (2 byte), the identical byte itself (1 byte) and the length for the next arbitrary bytes run (2 byte).
|
||||
// So to get an advantage from stopping an arbitrary bytes run, at least 6 bytes have to be identical.
|
||||
|
||||
// realEnd avoids an overflow of the 16bit counter
|
||||
uint8_t *realEnd = std::min(end, start + std::numeric_limits<uint16_t>::max());
|
||||
|
||||
// Count the number of identical bytes and return if it finds a run of more than 6 identical bytes.
|
||||
uint8_t lastByte = *start + 1; // Something different from the first byte
|
||||
uint8_t numIdenticalBytes = 1;
|
||||
for(uint8_t *current = start; current != realEnd; ++current) {
|
||||
if (*current == lastByte) {
|
||||
++numIdenticalBytes;
|
||||
if (numIdenticalBytes == 6) {
|
||||
return current - start - 5; //-5, because the end pointer for the arbitrary byte run should point to the first identical byte, not the one before.
|
||||
}
|
||||
} else {
|
||||
numIdenticalBytes = 1;
|
||||
}
|
||||
lastByte = *current;
|
||||
}
|
||||
//It wasn't worth stopping the arbitrary bytes run anywhere. The whole region should be an arbitrary run.
|
||||
return realEnd-start;
|
||||
}
|
||||
|
||||
void RunLengthEncoding::_encodeIdenticalWords(uint8_t **current, uint8_t* end, ostringstream *output) {
|
||||
uint16_t size = _countIdenticalBytes(*current, end);
|
||||
output->write((const char*)&size, sizeof(uint16_t));
|
||||
output->write((const char*)*current, 1);
|
||||
*current += size;
|
||||
}
|
||||
|
||||
uint16_t RunLengthEncoding::_countIdenticalBytes(uint8_t *start, uint8_t *end) {
|
||||
uint8_t *realEnd = std::min(end, start + std::numeric_limits<uint16_t>::max()); // This prevents overflow of the 16bit counter
|
||||
for (uint8_t *current = start+1; current != realEnd; ++current) {
|
||||
if (*current != *start) {
|
||||
return current-start;
|
||||
}
|
||||
}
|
||||
// All bytes have been identical
|
||||
return realEnd - start;
|
||||
}
|
||||
|
||||
Data RunLengthEncoding::_extractData(ostringstream *stream) {
|
||||
string str = stream->str();
|
||||
Data data(str.size());
|
||||
std::memcpy(data.data(), str.c_str(), str.size());
|
||||
return data;
|
||||
}
|
||||
|
||||
Data RunLengthEncoding::Decompress(const void *data, size_t size) {
|
||||
istringstream stream;
|
||||
_parseData((uint8_t*)data, size, &stream);
|
||||
ostringstream decompressed;
|
||||
while(_hasData(&stream)) {
|
||||
_decodeArbitraryWords(&stream, &decompressed);
|
||||
if (!_hasData(&stream)) {
|
||||
break;
|
||||
}
|
||||
_decodeIdenticalWords(&stream, &decompressed);
|
||||
}
|
||||
return _extractData(&decompressed);
|
||||
}
|
||||
|
||||
bool RunLengthEncoding::_hasData(istringstream *str) {
|
||||
str->peek();
|
||||
return !str->eof();
|
||||
}
|
||||
|
||||
void RunLengthEncoding::_parseData(const uint8_t *data, size_t size, istringstream *result) {
|
||||
result->str(string((const char*)data, size));
|
||||
}
|
||||
|
||||
void RunLengthEncoding::_decodeArbitraryWords(istringstream *stream, ostringstream *decompressed) {
|
||||
uint16_t size;
|
||||
stream->read((char*)&size, sizeof(uint16_t));
|
||||
ASSERT(stream->good(), "Premature end of stream");
|
||||
Data run(size);
|
||||
stream->read((char*)run.data(), size);
|
||||
ASSERT(stream->good(), "Premature end of stream");
|
||||
decompressed->write((const char*)run.data(), run.size());
|
||||
}
|
||||
|
||||
void RunLengthEncoding::_decodeIdenticalWords(istringstream *stream, ostringstream *decompressed) {
|
||||
uint16_t size;
|
||||
stream->read((char*)&size, sizeof(uint16_t));
|
||||
ASSERT(stream->good(), "Premature end of stream");
|
||||
uint8_t value;
|
||||
stream->read((char*)&value, 1);
|
||||
ASSERT(stream->good(), "Premature end of stream");
|
||||
Data run(size);
|
||||
std::memset(run.data(), value, run.size());
|
||||
decompressed->write((const char*)run.data(), run.size());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_RUNLENGTHENCODING_H
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_RUNLENGTHENCODING_H
|
||||
|
||||
#include <messmer/cpp-utils/data/Data.h>
|
||||
|
||||
namespace blockstore {
|
||||
namespace compressing {
|
||||
class RunLengthEncoding {
|
||||
public:
|
||||
static cpputils::Data Compress(const cpputils::Data &data);
|
||||
|
||||
static cpputils::Data Decompress(const void *data, size_t size);
|
||||
|
||||
private:
|
||||
static void _encodeArbitraryWords(uint8_t **current, uint8_t* end, std::ostringstream *output);
|
||||
static uint16_t _arbitraryRunLength(uint8_t *start, uint8_t* end);
|
||||
static void _encodeIdenticalWords(uint8_t **current, uint8_t* end, std::ostringstream *output);
|
||||
static uint16_t _countIdenticalBytes(uint8_t *start, uint8_t *end);
|
||||
static bool _hasData(std::istringstream *stream);
|
||||
static cpputils::Data _extractData(std::ostringstream *stream);
|
||||
static void _parseData(const uint8_t *data, size_t size, std::istringstream *result);
|
||||
static void _decodeArbitraryWords(std::istringstream *stream, std::ostringstream *decompressed);
|
||||
static void _decodeIdenticalWords(std::istringstream *stream, std::ostringstream *decompressed);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
@ -0,0 +1 @@
|
||||
#include "EncryptedBlock.h"
|
177
src/blockstore/implementations/encrypted/EncryptedBlock.h
Normal file
177
src/blockstore/implementations/encrypted/EncryptedBlock.h
Normal file
@ -0,0 +1,177 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_ENCRYPTED_ENCRYPTEDBLOCK_H_
|
||||
#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_ENCRYPTED_ENCRYPTEDBLOCK_H_
|
||||
|
||||
#include "../../interface/Block.h"
|
||||
#include <messmer/cpp-utils/data/Data.h>
|
||||
#include "../../interface/BlockStore.h"
|
||||
|
||||
#include "messmer/cpp-utils/macros.h"
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
#include <boost/optional.hpp>
|
||||
#include <messmer/cpp-utils/crypto/symmetric/Cipher.h>
|
||||
#include <messmer/cpp-utils/assert/assert.h>
|
||||
#include <messmer/cpp-utils/data/DataUtils.h>
|
||||
#include <mutex>
|
||||
#include <messmer/cpp-utils/logging/logging.h>
|
||||
|
||||
namespace blockstore {
|
||||
namespace encrypted {
|
||||
template<class Cipher> class EncryptedBlockStore;
|
||||
|
||||
//TODO Test EncryptedBlock
|
||||
|
||||
//TODO Fix mutexes & locks (basically true for all blockstores)
|
||||
|
||||
template<class Cipher>
|
||||
class EncryptedBlock final: public Block {
|
||||
public:
|
||||
BOOST_CONCEPT_ASSERT((cpputils::CipherConcept<Cipher>));
|
||||
static boost::optional<cpputils |