SynchronizedBlockStore allows concurrent access to the same block, but ensures that the block is only loaded once from the underlying blockstore

This commit is contained in:
Sebastian Meßmer 2015-03-31 08:02:24 -04:00
parent 41600c13f7
commit e3f7491d2d
8 changed files with 149 additions and 209 deletions

View File

@ -0,0 +1,41 @@
#include "CachedBlockRef.h"
#include "SynchronizedBlockStore.h"
using std::shared_ptr;
using std::make_unique;
using std::function;
namespace blockstore {
namespace synchronized {
CachedBlockRef::CachedBlockRef(Block *baseBlock, SynchronizedBlockStore *blockStore)
//TODO We store key twice here - once in OpenBlock, once in the underlying baseBlock.
// Should we move that to make CachedBlockRef::key() call _baseBlock.key()?
:Block(baseBlock->key()),
_baseBlock(baseBlock),
_blockStore(blockStore) {
}
CachedBlockRef::~CachedBlockRef() {
_baseBlock->flush();
_blockStore->release(_baseBlock);
}
const void *CachedBlockRef::data() const {
return _baseBlock->data();
}
void CachedBlockRef::write(const void *source, uint64_t offset, uint64_t size) {
return _baseBlock->write(source, offset, size);
}
size_t CachedBlockRef::size() const {
return _baseBlock->size();
}
void CachedBlockRef::flush() {
return _baseBlock->flush();
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#ifndef BLOCKSTORE_IMPLEMENTATIONS_SYNCHRONIZED_CACHEDBLOCKREF_H_
#define BLOCKSTORE_IMPLEMENTATIONS_SYNCHRONIZED_CACHEDBLOCKREF_H_
#include "../../interface/Block.h"
#include "messmer/cpp-utils/macros.h"
#include <memory>
namespace blockstore {
namespace synchronized {
class SynchronizedBlockStore;
class CachedBlockRef: public Block {
public:
CachedBlockRef(Block *baseBlock, SynchronizedBlockStore *blockStore);
virtual ~CachedBlockRef();
const void *data() const override;
void write(const void *source, uint64_t offset, uint64_t size) override;
void flush() override;
size_t size() const override;
private:
Block *_baseBlock;
SynchronizedBlockStore *_blockStore;
DISALLOW_COPY_AND_ASSIGN(CachedBlockRef);
};
}
}
#endif

View File

@ -1,40 +0,0 @@
#include <messmer/blockstore/implementations/synchronized/OpenBlock.h>
#include "SynchronizedBlockStore.h"
using std::unique_ptr;
using std::make_unique;
using std::function;
namespace blockstore {
namespace synchronized {
OpenBlock::OpenBlock(unique_ptr<Block> baseBlock, OpenBlockList *openBlockList)
//TODO We store key twice here - once in OpenBlock, once in the underlying baseBlock.
// Should we move that to make OpenBlock::key() call _baseBlock.key()?
:Block(baseBlock->key()),
_baseBlock(std::move(baseBlock)),
_openBlockList(openBlockList) {
}
OpenBlock::~OpenBlock() {
_openBlockList->release(std::move(_baseBlock));
}
const void *OpenBlock::data() const {
return _baseBlock->data();
}
void OpenBlock::write(const void *source, uint64_t offset, uint64_t size) {
return _baseBlock->write(source, offset, size);
}
size_t OpenBlock::size() const {
return _baseBlock->size();
}
void OpenBlock::flush() {
return _baseBlock->flush();
}
}
}

View File

@ -1,36 +0,0 @@
#pragma once
#ifndef BLOCKSTORE_IMPLEMENTATIONS_SYNCHRONIZED_OPENBLOCK_H_
#define BLOCKSTORE_IMPLEMENTATIONS_SYNCHRONIZED_OPENBLOCK_H_
#include "../../interface/Block.h"
#include "messmer/cpp-utils/macros.h"
#include <memory>
namespace blockstore {
namespace synchronized {
class OpenBlockList;
class OpenBlock: public Block {
public:
OpenBlock(std::unique_ptr<Block> baseBlock, OpenBlockList *openBlockList);
virtual ~OpenBlock();
const void *data() const override;
void write(const void *source, uint64_t offset, uint64_t size) override;
void flush() override;
size_t size() const override;
private:
std::unique_ptr<Block> _baseBlock;
OpenBlockList *_openBlockList;
DISALLOW_COPY_AND_ASSIGN(OpenBlock);
};
}
}
#endif

View File

@ -1,80 +0,0 @@
#include "OpenBlockList.h"
#include "OpenBlock.h"
#include <cassert>
using std::unique_ptr;
using std::make_unique;
using std::function;
using std::mutex;
using std::lock_guard;
using std::unique_lock;
using std::promise;
using std::future;
namespace blockstore {
namespace synchronized {
OpenBlockList::OpenBlockList() {
}
OpenBlockList::~OpenBlockList() {
}
unique_ptr<Block> OpenBlockList::insert(unique_ptr<Block> block) {
lock_guard<mutex> lock(_mutex);
auto insertResult = _openBlocks.insert(block->key());
assert(insertResult.second == true);
return make_unique<OpenBlock>(std::move(block), this);
}
unique_ptr<Block> OpenBlockList::acquire(const Key &key, function<unique_ptr<Block> ()> loader) {
unique_lock<mutex> lock(_mutex);
auto insertResult = _openBlocks.insert(key);
auto blockWasNotOpenYet = insertResult.second;
if (blockWasNotOpenYet) {
lock.unlock();
auto block = loader();
if (block.get() == nullptr) {
return nullptr;
}
return make_unique<OpenBlock>(std::move(block), this);
} else {
auto blockFuture = _addPromiseForBlock(key);
lock.unlock();
return blockFuture.get();
}
}
future<unique_ptr<Block>> OpenBlockList::_addPromiseForBlock(const Key &key) {
auto insertResult = _wantedBlocks.emplace(key, promise<unique_ptr<Block>>());
assert(insertResult.second == true);
return insertResult.first->second.get_future();
}
void OpenBlockList::release(unique_ptr<Block> block) {
lock_guard<mutex> lock(_mutex);
auto foundWantedBlock = _wantedBlocks.find(block->key());
if (foundWantedBlock != _wantedBlocks.end()) {
foundWantedBlock->second.set_value(std::move(block));
} else {
_openBlocks.erase(block->key());
auto foundBlockToClose = _blocksToClose.find(block->key());
if (foundBlockToClose != _blocksToClose.end()) {
foundBlockToClose->second.set_value(std::move(block));
}
}
}
void OpenBlockList::close(unique_ptr<Block> block, function<void (unique_ptr<Block>)> onClose) {
unique_lock<mutex> lock(_mutex);
auto insertResult = _blocksToClose.emplace(block->key(), promise<unique_ptr<Block>>());
assert(insertResult.second == true);
block.reset();
lock.unlock();
auto closedBlock = insertResult.first->second.get_future().get();
onClose(std::move(closedBlock));
}
}
}

View File

@ -1,42 +0,0 @@
#ifndef MESSMER_BLOCKSTORE_TEST_IMPLEMENTATIONS_SYNCHRONIZED_OPENBLOCKLIST_H_
#define MESSMER_BLOCKSTORE_TEST_IMPLEMENTATIONS_SYNCHRONIZED_OPENBLOCKLIST_H_
#include <memory>
#include <set>
#include <map>
#include <vector>
#include <functional>
#include <mutex>
#include "../../utils/Key.h"
#include <future>
namespace blockstore {
class Block;
namespace synchronized {
class OpenBlockList {
public:
OpenBlockList();
virtual ~OpenBlockList();
std::unique_ptr<Block> insert(std::unique_ptr<Block> block);
std::unique_ptr<Block> acquire(const Key &key, std::function<std::unique_ptr<Block> ()> loader);
void release(std::unique_ptr<Block> block);
void close(std::unique_ptr<Block> block, std::function<void (std::unique_ptr<Block>)> onClose);
private:
std::set<Key> _openBlocks;
std::map<Key, std::promise<std::unique_ptr<Block>>> _wantedBlocks;
std::map<Key, std::promise<std::unique_ptr<Block>>> _blocksToClose;
//TODO Check whether we need this mutex or whether we can write it threadsafe without a mutex
mutable std::mutex _mutex;
std::future<std::unique_ptr<Block>> _addPromiseForBlock(const Key &key);
};
}
}
#endif

View File

@ -1,31 +1,72 @@
#include "CachedBlockRef.h"
#include "SynchronizedBlockStore.h"
#include <cassert>
using std::unique_ptr;
using std::make_unique;
using std::string;
using std::mutex;
using std::lock_guard;
using std::promise;
namespace blockstore {
namespace synchronized {
SynchronizedBlockStore::SynchronizedBlockStore(unique_ptr<BlockStore> baseBlockStore)
: _baseBlockStore(std::move(baseBlockStore)),
_openBlockList() {
_openBlocks() {
}
unique_ptr<Block> SynchronizedBlockStore::create(size_t size) {
return _openBlockList.insert(_baseBlockStore->create(size));
auto block = _baseBlockStore->create(size);
lock_guard<mutex> lock(_mutex);
return _addOpenBlock(std::move(block));
}
unique_ptr<Block> SynchronizedBlockStore::_addOpenBlock(unique_ptr<Block> block) {
auto insertResult = _openBlocks.emplace(block->key(), std::move(block));
assert(true == insertResult.second);
return make_unique<CachedBlockRef>(insertResult.first->second.getReference(), this);
}
unique_ptr<Block> SynchronizedBlockStore::load(const Key &key) {
return _openBlockList.acquire(key, [this, key] {
return _baseBlockStore->load(key);
});
lock_guard<mutex> lock(_mutex);
auto found = _openBlocks.find(key);
if (found == _openBlocks.end()) {
auto block = _baseBlockStore->load(key);
if (block.get() == nullptr) {
return nullptr;
}
return _addOpenBlock(std::move(block));
} else {
return make_unique<CachedBlockRef>(found->second.getReference(), this);
}
}
void SynchronizedBlockStore::release(const Block *block) {
lock_guard<mutex> lock(_mutex);
Key key = block->key();
auto found = _openBlocks.find(key);
assert (found != _openBlocks.end());
found->second.releaseReference();
if (found->second.refCount == 0) {
auto foundToRemove = _blocksToRemove.find(key);
if (foundToRemove != _blocksToRemove.end()) {
foundToRemove->second.set_value(std::move(found->second.block));
}
_openBlocks.erase(found);
}
}
void SynchronizedBlockStore::remove(unique_ptr<Block> block) {
_openBlockList.close(std::move(block), [this] (unique_ptr<Block> block) {
_baseBlockStore->remove(std::move(block));
});
auto insertResult = _blocksToRemove.emplace(block->key(), promise<unique_ptr<Block>>());
assert(true == insertResult.second);
block.reset();
//Wait for last block user to release it
auto blockToRemove = insertResult.first->second.get_future().get();
_baseBlockStore->remove(std::move(blockToRemove));
}
uint64_t SynchronizedBlockStore::numBlocks() const {

View File

@ -4,14 +4,16 @@
#include "messmer/cpp-utils/macros.h"
#include <memory>
#include <mutex>
#include <map>
#include <future>
#include "../../interface/BlockStore.h"
#include "OpenBlockList.h"
namespace blockstore {
namespace synchronized {
//TODO Rename to CachingBlockStore or something else
class SynchronizedBlockStore: public BlockStore {
public:
SynchronizedBlockStore(std::unique_ptr<BlockStore> baseBlockStore);
@ -21,9 +23,27 @@ public:
void remove(std::unique_ptr<Block> block) override;
uint64_t numBlocks() const override;
void release(const Block *block);
private:
struct OpenBlock {
OpenBlock(std::unique_ptr<Block> block_): block(std::move(block_)), refCount(0) {}
Block *getReference() {
++refCount;
return block.get();
}
void releaseReference() {
--refCount;
}
std::unique_ptr<Block> block;
uint32_t refCount;
};
std::unique_ptr<BlockStore> _baseBlockStore;
OpenBlockList _openBlockList;
std::map<Key, OpenBlock> _openBlocks;
std::mutex _mutex;
std::map<Key, std::promise<std::unique_ptr<Block>>> _blocksToRemove;
std::unique_ptr<Block> _addOpenBlock(std::unique_ptr<Block> block);
DISALLOW_COPY_AND_ASSIGN(SynchronizedBlockStore);
};