Merged blobstore repository

This commit is contained in:
Sebastian Messmer 2016-02-11 14:50:18 +01:00
commit 31d52d3c8d
62 changed files with 5200 additions and 2 deletions

1
.gitignore vendored
View File

@ -1,4 +1,3 @@
/build
/cmake
/.idea

View File

@ -36,5 +36,6 @@ script:
- ./run_with_fuse.sh ./test/fspp/fspp-test
- ./test/parallelaccessstore/parallelaccessstore-test
- ./test/blockstore/blockstore-test
- ./test/blobstore/blobstore-test
after_script:
- rm run_with_fuse.sh

View File

@ -2,4 +2,5 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
add_subdirectory(cpp-utils)
add_subdirectory(fspp)
add_subdirectory(blockstore)
add_subdirectory(blockstore)
add_subdirectory(blobstore)

View File

@ -0,0 +1,29 @@
project (blobstore)
set(SOURCES
implementations/onblocks/parallelaccessdatatreestore/ParallelAccessDataTreeStoreAdapter.cpp
implementations/onblocks/parallelaccessdatatreestore/DataTreeRef.cpp
implementations/onblocks/parallelaccessdatatreestore/ParallelAccessDataTreeStore.cpp
implementations/onblocks/utils/Math.cpp
implementations/onblocks/BlobStoreOnBlocks.cpp
implementations/onblocks/datanodestore/DataNode.cpp
implementations/onblocks/datanodestore/DataLeafNode.cpp
implementations/onblocks/datanodestore/DataInnerNode.cpp
implementations/onblocks/datanodestore/DataNodeStore.cpp
implementations/onblocks/datatreestore/impl/algorithms.cpp
implementations/onblocks/datatreestore/DataTree.cpp
implementations/onblocks/datatreestore/DataTreeStore.cpp
implementations/onblocks/BlobOnBlocks.cpp
)
add_library(${PROJECT_NAME} STATIC ${SOURCES})
# This is needed by boost thread
if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
target_link_libraries(${PROJECT_NAME} PRIVATE rt)
endif(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
target_link_libraries(${PROJECT_NAME} PUBLIC cpp-utils blockstore)
target_add_boost(${PROJECT_NAME} filesystem system thread)
target_enable_style_warnings(${PROJECT_NAME})
target_activate_cpp14(${PROJECT_NAME})

View File

@ -0,0 +1,109 @@
#include "parallelaccessdatatreestore/DataTreeRef.h"
#include "BlobOnBlocks.h"
#include "datanodestore/DataLeafNode.h"
#include "utils/Math.h"
#include <cmath>
#include <messmer/cpp-utils/assert/assert.h>
using std::function;
using cpputils::unique_ref;
using cpputils::Data;
using blobstore::onblocks::datanodestore::DataLeafNode;
using blobstore::onblocks::datanodestore::DataNodeLayout;
using blockstore::Key;
namespace blobstore {
namespace onblocks {
using parallelaccessdatatreestore::DataTreeRef;
BlobOnBlocks::BlobOnBlocks(unique_ref<DataTreeRef> datatree)
: _datatree(std::move(datatree)), _sizeCache(boost::none) {
}
BlobOnBlocks::~BlobOnBlocks() {
}
uint64_t BlobOnBlocks::size() const {
if (_sizeCache == boost::none) {
_sizeCache = _datatree->numStoredBytes();
}
return *_sizeCache;
}
void BlobOnBlocks::resize(uint64_t numBytes) {
_datatree->resizeNumBytes(numBytes);
_sizeCache = numBytes;
}
void BlobOnBlocks::traverseLeaves(uint64_t beginByte, uint64_t sizeBytes, function<void (uint64_t, DataLeafNode *leaf, uint32_t, uint32_t)> func) const {
uint64_t endByte = beginByte + sizeBytes;
uint32_t firstLeaf = beginByte / _datatree->maxBytesPerLeaf();
uint32_t endLeaf = utils::ceilDivision(endByte, _datatree->maxBytesPerLeaf());
bool writingOutside = size() < endByte; // TODO Calling size() is slow because it has to traverse the tree
_datatree->traverseLeaves(firstLeaf, endLeaf, [&func, beginByte, endByte, endLeaf, writingOutside](DataLeafNode *leaf, uint32_t leafIndex) {
uint64_t indexOfFirstLeafByte = leafIndex * leaf->maxStoreableBytes();
uint32_t dataBegin = utils::maxZeroSubtraction(beginByte, indexOfFirstLeafByte);
uint32_t dataEnd = std::min(leaf->maxStoreableBytes(), endByte - indexOfFirstLeafByte);
if (leafIndex == endLeaf-1 && writingOutside) {
// If we are traversing an area that didn't exist before, then the last leaf was just created with a wrong size. We have to fix it.
leaf->resize(dataEnd);
}
func(indexOfFirstLeafByte, leaf, dataBegin, dataEnd-dataBegin);
});
if (writingOutside) {
ASSERT(_datatree->numStoredBytes() == endByte, "Writing didn't grow by the correct number of bytes");
_sizeCache = endByte;
}
}
Data BlobOnBlocks::readAll() const {
//TODO Querying size is inefficient. Is this possible without a call to size()?
uint64_t count = size();
Data result(count);
_read(result.data(), 0, count);
return result;
}
void BlobOnBlocks::read(void *target, uint64_t offset, uint64_t count) const {
ASSERT(offset <= size() && offset + count <= size(), "BlobOnBlocks::read() read outside blob. Use BlobOnBlocks::tryRead() if this should be allowed.");
uint64_t read = tryRead(target, offset, count);
ASSERT(read == count, "BlobOnBlocks::read() couldn't read all requested bytes. Use BlobOnBlocks::tryRead() if this should be allowed.");
}
uint64_t BlobOnBlocks::tryRead(void *target, uint64_t offset, uint64_t count) const {
//TODO Quite inefficient to call size() here, because that has to traverse the tree
uint64_t realCount = std::max(UINT64_C(0), std::min(count, size()-offset));
_read(target, offset, realCount);
return realCount;
}
void BlobOnBlocks::_read(void *target, uint64_t offset, uint64_t count) const {
traverseLeaves(offset, count, [target, offset] (uint64_t indexOfFirstLeafByte, const DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
//TODO Simplify formula, make it easier to understand
leaf->read((uint8_t*)target + indexOfFirstLeafByte - offset + leafDataOffset, leafDataOffset, leafDataSize);
});
}
void BlobOnBlocks::write(const void *source, uint64_t offset, uint64_t count) {
traverseLeaves(offset, count, [source, offset] (uint64_t indexOfFirstLeafByte, DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
//TODO Simplify formula, make it easier to understand
leaf->write((uint8_t*)source + indexOfFirstLeafByte - offset + leafDataOffset, leafDataOffset, leafDataSize);
});
}
void BlobOnBlocks::flush() {
_datatree->flush();
}
const Key &BlobOnBlocks::key() const {
return _datatree->key();
}
unique_ref<DataTreeRef> BlobOnBlocks::releaseTree() {
return std::move(_datatree);
}
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_BLOBONBLOCKS_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_BLOBONBLOCKS_H_
#include "../../interface/Blob.h"
#include <memory>
#include <boost/optional.hpp>
namespace blobstore {
namespace onblocks {
namespace datanodestore {
class DataLeafNode;
}
namespace parallelaccessdatatreestore {
class DataTreeRef;
}
class BlobOnBlocks final: public Blob {
public:
BlobOnBlocks(cpputils::unique_ref<parallelaccessdatatreestore::DataTreeRef> datatree);
~BlobOnBlocks();
const blockstore::Key &key() const override;
uint64_t size() const override;
void resize(uint64_t numBytes) override;
cpputils::Data readAll() const override;
void read(void *target, uint64_t offset, uint64_t size) const override;
uint64_t tryRead(void *target, uint64_t offset, uint64_t size) const override;
void write(const void *source, uint64_t offset, uint64_t size) override;
void flush() override;
cpputils::unique_ref<parallelaccessdatatreestore::DataTreeRef> releaseTree();
private:
void _read(void *target, uint64_t offset, uint64_t count) const;
void traverseLeaves(uint64_t offsetBytes, uint64_t sizeBytes, std::function<void (uint64_t, datanodestore::DataLeafNode *, uint32_t, uint32_t)>) const;
cpputils::unique_ref<parallelaccessdatatreestore::DataTreeRef> _datatree;
mutable boost::optional<uint64_t> _sizeCache;
DISALLOW_COPY_AND_ASSIGN(BlobOnBlocks);
};
}
}
#endif

View File

@ -0,0 +1,56 @@
#include "parallelaccessdatatreestore/DataTreeRef.h"
#include "parallelaccessdatatreestore/ParallelAccessDataTreeStore.h"
#include <messmer/blockstore/implementations/parallelaccess/ParallelAccessBlockStore.h>
#include "datanodestore/DataLeafNode.h"
#include "datanodestore/DataNodeStore.h"
#include "datatreestore/DataTreeStore.h"
#include "datatreestore/DataTree.h"
#include "BlobStoreOnBlocks.h"
#include "BlobOnBlocks.h"
#include <messmer/cpp-utils/pointer/cast.h>
#include <messmer/cpp-utils/assert/assert.h>
using cpputils::unique_ref;
using cpputils::make_unique_ref;
using blockstore::BlockStore;
using blockstore::parallelaccess::ParallelAccessBlockStore;
using blockstore::Key;
using cpputils::dynamic_pointer_move;
using boost::optional;
using boost::none;
namespace blobstore {
namespace onblocks {
using datanodestore::DataNodeStore;
using datatreestore::DataTreeStore;
using parallelaccessdatatreestore::ParallelAccessDataTreeStore;
BlobStoreOnBlocks::BlobStoreOnBlocks(unique_ref<BlockStore> blockStore, uint32_t blocksizeBytes)
: _dataTreeStore(make_unique_ref<ParallelAccessDataTreeStore>(make_unique_ref<DataTreeStore>(make_unique_ref<DataNodeStore>(make_unique_ref<ParallelAccessBlockStore>(std::move(blockStore)), blocksizeBytes)))) {
}
BlobStoreOnBlocks::~BlobStoreOnBlocks() {
}
unique_ref<Blob> BlobStoreOnBlocks::create() {
return make_unique_ref<BlobOnBlocks>(_dataTreeStore->createNewTree());
}
optional<unique_ref<Blob>> BlobStoreOnBlocks::load(const Key &key) {
auto tree = _dataTreeStore->load(key);
if (tree == none) {
return none;
}
return optional<unique_ref<Blob>>(make_unique_ref<BlobOnBlocks>(std::move(*tree)));
}
void BlobStoreOnBlocks::remove(unique_ref<Blob> blob) {
auto _blob = dynamic_pointer_move<BlobOnBlocks>(blob);
ASSERT(_blob != none, "Passed Blob in BlobStoreOnBlocks::remove() is not a BlobOnBlocks.");
_dataTreeStore->remove((*_blob)->releaseTree());
}
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_BLOCKED_BLOBSTOREONBLOCKS_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_BLOCKED_BLOBSTOREONBLOCKS_H_
#include "../../interface/BlobStore.h"
#include "messmer/blockstore/interface/BlockStore.h"
namespace blobstore {
namespace onblocks {
namespace parallelaccessdatatreestore {
class ParallelAccessDataTreeStore;
}
//TODO Make blobstore able to cope with incomplete data (some blocks missing, because they're not synchronized yet) and write test cases for that
class BlobStoreOnBlocks final: public BlobStore {
public:
BlobStoreOnBlocks(cpputils::unique_ref<blockstore::BlockStore> blockStore, uint32_t blocksizeBytes);
~BlobStoreOnBlocks();
cpputils::unique_ref<Blob> create() override;
boost::optional<cpputils::unique_ref<Blob>> load(const blockstore::Key &key) override;
void remove(cpputils::unique_ref<Blob> blob) override;
private:
cpputils::unique_ref<parallelaccessdatatreestore::ParallelAccessDataTreeStore> _dataTreeStore;
DISALLOW_COPY_AND_ASSIGN(BlobStoreOnBlocks);
};
}
}
#endif

View File

@ -0,0 +1,87 @@
#include "DataInnerNode.h"
#include "DataNodeStore.h"
#include <messmer/cpp-utils/assert/assert.h>
using blockstore::Block;
using cpputils::Data;
using cpputils::unique_ref;
using cpputils::make_unique_ref;
using blockstore::Key;
namespace blobstore {
namespace onblocks {
namespace datanodestore {
DataInnerNode::DataInnerNode(DataNodeView view)
: DataNode(std::move(view)) {
ASSERT(depth() > 0, "Inner node can't have depth 0. Is this a leaf maybe?");
}
DataInnerNode::~DataInnerNode() {
}
unique_ref<DataInnerNode> DataInnerNode::InitializeNewNode(unique_ref<Block> block, const DataNode &first_child) {
DataNodeView node(std::move(block));
node.setDepth(first_child.depth() + 1);
node.setSize(1);
auto result = make_unique_ref<DataInnerNode>(std::move(node));
result->ChildrenBegin()->setKey(first_child.key());
return result;
}
uint32_t DataInnerNode::numChildren() const {
return node().Size();
}
DataInnerNode::ChildEntry *DataInnerNode::ChildrenBegin() {
return const_cast<ChildEntry*>(const_cast<const DataInnerNode*>(this)->ChildrenBegin());
}
const DataInnerNode::ChildEntry *DataInnerNode::ChildrenBegin() const {
return node().DataBegin<ChildEntry>();
}
DataInnerNode::ChildEntry *DataInnerNode::ChildrenEnd() {
return const_cast<ChildEntry*>(const_cast<const DataInnerNode*>(this)->ChildrenEnd());
}
const DataInnerNode::ChildEntry *DataInnerNode::ChildrenEnd() const {
return ChildrenBegin() + node().Size();
}
DataInnerNode::ChildEntry *DataInnerNode::LastChild() {
return const_cast<ChildEntry*>(const_cast<const DataInnerNode*>(this)->LastChild());
}
const DataInnerNode::ChildEntry *DataInnerNode::LastChild() const {
return getChild(numChildren()-1);
}
DataInnerNode::ChildEntry *DataInnerNode::getChild(unsigned int index) {
return const_cast<ChildEntry*>(const_cast<const DataInnerNode*>(this)->getChild(index));
}
const DataInnerNode::ChildEntry *DataInnerNode::getChild(unsigned int index) const {
ASSERT(index < numChildren(), "Accessing child out of range");
return ChildrenBegin()+index;
}
void DataInnerNode::addChild(const DataNode &child) {
ASSERT(numChildren() < maxStoreableChildren(), "Adding more children than we can store");
ASSERT(child.depth() == depth()-1, "The child that should be added has wrong depth");
node().setSize(node().Size()+1);
LastChild()->setKey(child.key());
}
void DataInnerNode::removeLastChild() {
ASSERT(node().Size() > 1, "There is no child to remove");
node().setSize(node().Size()-1);
}
uint32_t DataInnerNode::maxStoreableChildren() const {
return node().layout().maxChildrenPerInnerNode();
}
}
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATAINNERNODE_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATAINNERNODE_H_
#include "DataNode.h"
#include "DataInnerNode_ChildEntry.h"
namespace blobstore {
namespace onblocks {
namespace datanodestore {
class DataInnerNode final: public DataNode {
public:
static cpputils::unique_ref<DataInnerNode> InitializeNewNode(cpputils::unique_ref<blockstore::Block> block, const DataNode &first_child_key);
DataInnerNode(DataNodeView block);
~DataInnerNode();
using ChildEntry = DataInnerNode_ChildEntry;
uint32_t maxStoreableChildren() const;
ChildEntry *getChild(unsigned int index);
const ChildEntry *getChild(unsigned int index) const;
uint32_t numChildren() const;
void addChild(const DataNode &child_key);
void removeLastChild();
ChildEntry *LastChild();
const ChildEntry *LastChild() const;
private:
ChildEntry *ChildrenBegin();
ChildEntry *ChildrenEnd();
const ChildEntry *ChildrenBegin() const;
const ChildEntry *ChildrenEnd() const;
DISALLOW_COPY_AND_ASSIGN(DataInnerNode);
};
}
}
}
#endif

View File

@ -0,0 +1,30 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATAINNERNODE_CHILDENTRY_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATAINNERNODE_CHILDENTRY_H_
#include <messmer/cpp-utils/macros.h>
namespace blobstore{
namespace onblocks{
namespace datanodestore{
struct DataInnerNode_ChildEntry final {
public:
blockstore::Key key() const {
return blockstore::Key::FromBinary(_keydata);
}
private:
void setKey(const blockstore::Key &key) {
key.ToBinary(_keydata);
}
friend class DataInnerNode;
uint8_t _keydata[blockstore::Key::BINARY_LENGTH];
DISALLOW_COPY_AND_ASSIGN(DataInnerNode_ChildEntry);
};
}
}
}
#endif

View File

@ -0,0 +1,67 @@
#include "DataLeafNode.h"
#include "DataInnerNode.h"
#include <messmer/cpp-utils/assert/assert.h>
using blockstore::Block;
using cpputils::Data;
using blockstore::Key;
using cpputils::unique_ref;
using cpputils::make_unique_ref;
namespace blobstore {
namespace onblocks {
namespace datanodestore {
DataLeafNode::DataLeafNode(DataNodeView view)
: DataNode(std::move(view)) {
ASSERT(node().Depth() == 0, "Leaf node must have depth 0. Is it an inner node instead?");
ASSERT(numBytes() <= maxStoreableBytes(), "Leaf says it stores more bytes than it has space for");
}
DataLeafNode::~DataLeafNode() {
}
unique_ref<DataLeafNode> DataLeafNode::InitializeNewNode(unique_ref<Block> block) {
DataNodeView node(std::move(block));
node.setDepth(0);
node.setSize(0);
//fillDataWithZeroes(); not needed, because a newly created block will be zeroed out. DataLeafNodeTest.SpaceIsZeroFilledWhenGrowing ensures this.
return make_unique_ref<DataLeafNode>(std::move(node));
}
void DataLeafNode::read(void *target, uint64_t offset, uint64_t size) const {
ASSERT(offset <= node().Size() && offset + size <= node().Size(), "Read out of valid area"); // Also check offset, because the addition could lead to overflows
std::memcpy(target, (uint8_t*)node().data() + offset, size);
}
void DataLeafNode::write(const void *source, uint64_t offset, uint64_t size) {
ASSERT(offset <= node().Size() && offset + size <= node().Size(), "Write out of valid area"); // Also check offset, because the addition could lead to overflows
node().write(source, offset, size);
}
uint32_t DataLeafNode::numBytes() const {
return node().Size();
}
void DataLeafNode::resize(uint32_t new_size) {
ASSERT(new_size <= maxStoreableBytes(), "Trying to resize to a size larger than the maximal size");
uint32_t old_size = node().Size();
if (new_size < old_size) {
fillDataWithZeroesFromTo(new_size, old_size);
}
node().setSize(new_size);
}
void DataLeafNode::fillDataWithZeroesFromTo(off_t begin, off_t end) {
Data ZEROES(end-begin);
ZEROES.FillWithZeroes();
node().write(ZEROES.data(), begin, end-begin);
}
uint64_t DataLeafNode::maxStoreableBytes() const {
return node().layout().maxBytesPerLeaf();
}
}
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATALEAFNODE_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATALEAFNODE_H_
#include "DataNode.h"
namespace blobstore {
namespace onblocks {
namespace datanodestore {
class DataInnerNode;
class DataLeafNode final: public DataNode {
public:
static cpputils::unique_ref<DataLeafNode> InitializeNewNode(cpputils::unique_ref<blockstore::Block> block);
DataLeafNode(DataNodeView block);
~DataLeafNode();
//Returning uint64_t, because calculations handling this probably need to be done in 64bit to support >4GB blobs.
uint64_t maxStoreableBytes() const;
void read(void *target, uint64_t offset, uint64_t size) const;
void write(const void *source, uint64_t offset, uint64_t size);
uint32_t numBytes() const;
void resize(uint32_t size);
private:
void fillDataWithZeroesFromTo(off_t begin, off_t end);
DISALLOW_COPY_AND_ASSIGN(DataLeafNode);
};
}
}
}
#endif

View File

@ -0,0 +1,54 @@
#include "DataInnerNode.h"
#include "DataLeafNode.h"
#include "DataNode.h"
#include "DataNodeStore.h"
#include <messmer/blockstore/utils/BlockStoreUtils.h>
using blockstore::Block;
using blockstore::Key;
using std::runtime_error;
using cpputils::unique_ref;
namespace blobstore {
namespace onblocks {
namespace datanodestore {
DataNode::DataNode(DataNodeView node)
: _node(std::move(node)) {
}
DataNode::~DataNode() {
}
DataNodeView &DataNode::node() {
return const_cast<DataNodeView&>(const_cast<const DataNode*>(this)->node());
}
const DataNodeView &DataNode::node() const {
return _node;
}
const Key &DataNode::key() const {
return _node.key();
}
uint8_t DataNode::depth() const {
return _node.Depth();
}
unique_ref<DataInnerNode> DataNode::convertToNewInnerNode(unique_ref<DataNode> node, const DataNode &first_child) {
Key key = node->key();
auto block = node->_node.releaseBlock();
blockstore::utils::fillWithZeroes(block.get());
return DataInnerNode::InitializeNewNode(std::move(block), first_child);
}
void DataNode::flush() const {
_node.flush();
}
}
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATANODE_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATANODE_H_
#include "DataNodeView.h"
#include <messmer/cpp-utils/data/Data.h>
namespace blobstore {
namespace onblocks {
namespace datanodestore {
class DataNodeStore;
class DataInnerNode;
class DataNode {
public:
virtual ~DataNode();
const blockstore::Key &key() const;
uint8_t depth() const;
static cpputils::unique_ref<DataInnerNode> convertToNewInnerNode(cpputils::unique_ref<DataNode> node, const DataNode &first_child);
void flush() const;
protected:
DataNode(DataNodeView block);
DataNodeView &node();
const DataNodeView &node() const;
friend class DataNodeStore;
private:
DataNodeView _node;
DISALLOW_COPY_AND_ASSIGN(DataNode);
};
}
}
}
#endif

View File

@ -0,0 +1,113 @@
#include "DataInnerNode.h"
#include "DataLeafNode.h"
#include "DataNodeStore.h"
#include "messmer/blockstore/interface/BlockStore.h"
#include "messmer/blockstore/interface/Block.h"
#include "messmer/blockstore/utils/BlockStoreUtils.h"
#include <messmer/cpp-utils/assert/assert.h>
using blockstore::BlockStore;
using blockstore::Block;
using blockstore::Key;
using cpputils::Data;
using cpputils::unique_ref;
using cpputils::make_unique_ref;
using std::runtime_error;
using boost::optional;
using boost::none;
namespace blobstore {
namespace onblocks {
namespace datanodestore {
DataNodeStore::DataNodeStore(unique_ref<BlockStore> blockstore, uint32_t blocksizeBytes)
: _blockstore(std::move(blockstore)), _layout(blocksizeBytes) {
}
DataNodeStore::~DataNodeStore() {
}
unique_ref<DataNode> DataNodeStore::load(unique_ref<Block> block) {
ASSERT(block->size() == _layout.blocksizeBytes(), "Loading block of wrong size");
DataNodeView node(std::move(block));
if (node.Depth() == 0) {
return make_unique_ref<DataLeafNode>(std::move(node));
} else if (node.Depth() <= MAX_DEPTH) {
return make_unique_ref<DataInnerNode>(std::move(node));
} else {
throw runtime_error("Tree is to deep. Data corruption?");
}
}
unique_ref<DataInnerNode> DataNodeStore::createNewInnerNode(const DataNode &first_child) {
ASSERT(first_child.node().layout().blocksizeBytes() == _layout.blocksizeBytes(), "Source node has wrong layout. Is it from the same DataNodeStore?");
//TODO Initialize block and then create it in the blockstore - this is more efficient than creating it and then writing to it
auto block = _blockstore->create(Data(_layout.blocksizeBytes()).FillWithZeroes());
return DataInnerNode::InitializeNewNode(std::move(block), first_child);
}
unique_ref<DataLeafNode> DataNodeStore::createNewLeafNode() {
//TODO Initialize block and then create it in the blockstore - this is more efficient than creating it and then writing to it
auto block = _blockstore->create(Data(_layout.blocksizeBytes()).FillWithZeroes());
return DataLeafNode::InitializeNewNode(std::move(block));
}
optional<unique_ref<DataNode>> DataNodeStore::load(const Key &key) {
auto block = _blockstore->load(key);
if (block == none) {
return none;
} else {
return load(std::move(*block));
}
}
unique_ref<DataNode> DataNodeStore::createNewNodeAsCopyFrom(const DataNode &source) {
ASSERT(source.node().layout().blocksizeBytes() == _layout.blocksizeBytes(), "Source node has wrong layout. Is it from the same DataNodeStore?");
auto newBlock = blockstore::utils::copyToNewBlock(_blockstore.get(), source.node().block());
return load(std::move(newBlock));
}
unique_ref<DataNode> DataNodeStore::overwriteNodeWith(unique_ref<DataNode> target, const DataNode &source) {
ASSERT(target->node().layout().blocksizeBytes() == _layout.blocksizeBytes(), "Target node has wrong layout. Is it from the same DataNodeStore?");
ASSERT(source.node().layout().blocksizeBytes() == _layout.blocksizeBytes(), "Source node has wrong layout. Is it from the same DataNodeStore?");
Key key = target->key();
{
auto targetBlock = target->node().releaseBlock();
cpputils::destruct(std::move(target)); // Call destructor
blockstore::utils::copyTo(targetBlock.get(), source.node().block());
}
auto loaded = load(key);
ASSERT(loaded != none, "Couldn't load the target node after overwriting it");
return std::move(*loaded);
}
void DataNodeStore::remove(unique_ref<DataNode> node) {
auto block = node->node().releaseBlock();
cpputils::destruct(std::move(node)); // Call destructor
_blockstore->remove(std::move(block));
}
uint64_t DataNodeStore::numNodes() const {
return _blockstore->numBlocks();
}
void DataNodeStore::removeSubtree(unique_ref<DataNode> node) {
DataInnerNode *inner = dynamic_cast<DataInnerNode*>(node.get());
if (inner != nullptr) {
for (uint32_t i = 0; i < inner->numChildren(); ++i) {
auto child = load(inner->getChild(i)->key());
ASSERT(child != none, "Couldn't load child node");
removeSubtree(std::move(*child));
}
}
remove(std::move(node));
}
DataNodeLayout DataNodeStore::layout() const {
return _layout;
}
}
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATANODESTORE_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATANODESTORE_H_
#include <memory>
#include "messmer/cpp-utils/macros.h"
#include "DataNodeView.h"
#include <messmer/blockstore/utils/Key.h>
namespace blockstore{
class Block;
class BlockStore;
}
namespace blobstore {
namespace onblocks {
namespace datanodestore {
class DataNode;
class DataLeafNode;
class DataInnerNode;
class DataNodeStore final {
public:
DataNodeStore(cpputils::unique_ref<blockstore::BlockStore> blockstore, uint32_t blocksizeBytes);
~DataNodeStore();
static constexpr uint8_t MAX_DEPTH = 10;
DataNodeLayout layout() const;
boost::optional<cpputils::unique_ref<DataNode>> load(const blockstore::Key &key);
cpputils::unique_ref<DataLeafNode> createNewLeafNode();
cpputils::unique_ref<DataInnerNode> createNewInnerNode(const DataNode &first_child);
cpputils::unique_ref<DataNode> createNewNodeAsCopyFrom(const DataNode &source);
cpputils::unique_ref<DataNode> overwriteNodeWith(cpputils::unique_ref<DataNode> target, const DataNode &source);
void remove(cpputils::unique_ref<DataNode> node);
void removeSubtree(cpputils::unique_ref<DataNode> node);
uint64_t numNodes() const;
//TODO Test overwriteNodeWith(), createNodeAsCopyFrom(), removeSubtree()
private:
cpputils::unique_ref<DataNode> load(cpputils::unique_ref<blockstore::Block> block);
cpputils::unique_ref<blockstore::BlockStore> _blockstore;
const DataNodeLayout _layout;
DISALLOW_COPY_AND_ASSIGN(DataNodeStore);
};
}
}
}
#endif

View File

@ -0,0 +1,140 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATANODEVIEW_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATANODESTORE_DATANODEVIEW_H_
#include "messmer/blockstore/interface/Block.h"
#include "../BlobStoreOnBlocks.h"
#include "DataInnerNode_ChildEntry.h"
#include <messmer/cpp-utils/pointer/unique_ref.h>
#include <memory>
#include <stdexcept>
#include <type_traits>
namespace blobstore {
namespace onblocks {
namespace datanodestore {
//TODO Move DataNodeLayout into own file
class DataNodeLayout final {
public:
constexpr DataNodeLayout(uint32_t blocksizeBytes)
:_blocksizeBytes(
(HEADERSIZE_BYTES + 2*sizeof(DataInnerNode_ChildEntry) <= blocksizeBytes)
? blocksizeBytes
: throw std::logic_error("Blocksize too small, not enough space to store two children in an inner node")) {
}
//Total size of the header
static constexpr uint32_t HEADERSIZE_BYTES = 8;
//Where in the header is the depth field
static constexpr uint32_t DEPTH_OFFSET_BYTES = 0;
//Where in the header is the size field (for inner nodes: number of children, for leafs: content data size)
static constexpr uint32_t SIZE_OFFSET_BYTES = 4;
//Size of a block (header + data region)
constexpr uint32_t blocksizeBytes() const {
return _blocksizeBytes;
}
//Number of bytes in the data region of a node
constexpr uint32_t datasizeBytes() const {
return _blocksizeBytes - HEADERSIZE_BYTES;
}
//Maximum number of children an inner node can store
constexpr uint32_t maxChildrenPerInnerNode() const {
return datasizeBytes() / sizeof(DataInnerNode_ChildEntry);
}
//Maximum number of bytes a leaf can store
//We are returning uint64_t here, because calculations involving maxBytesPerLeaf most probably should use 64bit integers to support blobs >4GB.
constexpr uint64_t maxBytesPerLeaf() const {
return datasizeBytes();
}
private:
uint32_t _blocksizeBytes;
};
class DataNodeView final {
public:
DataNodeView(cpputils::unique_ref<blockstore::Block> block): _block(std::move(block)) {
}
~DataNodeView() {}
DataNodeView(DataNodeView &&rhs) = default;
uint8_t Depth() const {
return *((uint8_t*)_block->data()+DataNodeLayout::DEPTH_OFFSET_BYTES);
}
void setDepth(uint8_t value) {
_block->write(&value, DataNodeLayout::DEPTH_OFFSET_BYTES, sizeof(value));
}
uint32_t Size() const {
return *(uint32_t*)((uint8_t*)_block->data()+DataNodeLayout::SIZE_OFFSET_BYTES);
}
void setSize(uint32_t value) {
_block->write(&value, DataNodeLayout::SIZE_OFFSET_BYTES, sizeof(value));
}
const void *data() const {
return (uint8_t*)_block->data() + DataNodeLayout::HEADERSIZE_BYTES;
}
void write(const void *source, uint64_t offset, uint64_t size) {
_block->write(source, offset + DataNodeLayout::HEADERSIZE_BYTES, size);
}
template<typename Entry>
const Entry *DataBegin() const {
return GetOffset<DataNodeLayout::HEADERSIZE_BYTES, Entry>();
}
template<typename Entry>
const Entry *DataEnd() const {
const unsigned int NUM_ENTRIES = layout().datasizeBytes() / sizeof(Entry);
return DataBegin<Entry>() + NUM_ENTRIES;
}
DataNodeLayout layout() const {
return DataNodeLayout(_block->size());
}
cpputils::unique_ref<blockstore::Block> releaseBlock() {
return std::move(_block);
}
const blockstore::Block &block() const {
return *_block;
}
const blockstore::Key &key() const {
return _block->key();
}
void flush() const {
_block->flush();
}
private:
template<int offset, class Type>
const Type *GetOffset() const {
return (Type*)(((const int8_t*)_block->data())+offset);
}
cpputils::unique_ref<blockstore::Block> _block;
DISALLOW_COPY_AND_ASSIGN(DataNodeView);
};
}
}
}
#endif

View File

@ -0,0 +1,334 @@
#include "DataTree.h"
#include "../datanodestore/DataNodeStore.h"
#include "../datanodestore/DataInnerNode.h"
#include "../datanodestore/DataLeafNode.h"
#include "../utils/Math.h"
#include "impl/algorithms.h"
#include "messmer/cpp-utils/pointer/cast.h"
#include "messmer/cpp-utils/pointer/optional_ownership_ptr.h"
#include <cmath>
#include <messmer/cpp-utils/assert/assert.h>
using blockstore::Key;
using blobstore::onblocks::datanodestore::DataNodeStore;
using blobstore::onblocks::datanodestore::DataNode;
using blobstore::onblocks::datanodestore::DataInnerNode;
using blobstore::onblocks::datanodestore::DataLeafNode;
using blobstore::onblocks::datanodestore::DataNodeLayout;
using std::dynamic_pointer_cast;
using std::function;
using boost::shared_mutex;
using boost::shared_lock;
using boost::unique_lock;
using boost::none;
using std::vector;
using cpputils::dynamic_pointer_move;
using cpputils::optional_ownership_ptr;
using cpputils::WithOwnership;
using cpputils::WithoutOwnership;
using cpputils::unique_ref;
namespace blobstore {
namespace onblocks {
namespace datatreestore {
DataTree::DataTree(DataNodeStore *nodeStore, unique_ref<DataNode> rootNode)
: _mutex(), _nodeStore(nodeStore), _rootNode(std::move(rootNode)) {
}
DataTree::~DataTree() {
}
void DataTree::removeLastDataLeaf() {
auto deletePosOrNull = algorithms::GetLowestRightBorderNodeWithMoreThanOneChildOrNull(_nodeStore, _rootNode.get());
ASSERT(deletePosOrNull.get() != nullptr, "Tree has only one leaf, can't shrink it.");
deleteLastChildSubtree(deletePosOrNull.get());
ifRootHasOnlyOneChildReplaceRootWithItsChild();
}
void DataTree::ifRootHasOnlyOneChildReplaceRootWithItsChild() {
DataInnerNode *rootNode = dynamic_cast<DataInnerNode*>(_rootNode.get());
ASSERT(rootNode != nullptr, "RootNode is not an inner node");
if (rootNode->numChildren() == 1) {
auto child = _nodeStore->load(rootNode->getChild(0)->key());
ASSERT(child != none, "Couldn't load first child of root node");
_rootNode = _nodeStore->overwriteNodeWith(std::move(_rootNode), **child);
_nodeStore->remove(std::move(*child));
}
}
void DataTree::deleteLastChildSubtree(DataInnerNode *node) {
auto lastChild = _nodeStore->load(node->LastChild()->key());
ASSERT(lastChild != none, "Couldn't load last child");
_nodeStore->removeSubtree(std::move(*lastChild));
node->removeLastChild();
}
unique_ref<DataLeafNode> DataTree::addDataLeaf() {
auto insertPosOrNull = algorithms::GetLowestInnerRightBorderNodeWithLessThanKChildrenOrNull(_nodeStore, _rootNode.get());
if (insertPosOrNull) {
return addDataLeafAt(insertPosOrNull.get());
} else {
return addDataLeafToFullTree();
}
}
unique_ref<DataLeafNode> DataTree::addDataLeafAt(DataInnerNode *insertPos) {
auto new_leaf = _nodeStore->createNewLeafNode();
auto chain = createChainOfInnerNodes(insertPos->depth()-1, new_leaf.get());
insertPos->addChild(*chain);
return new_leaf;
}
optional_ownership_ptr<DataNode> DataTree::createChainOfInnerNodes(unsigned int num, DataNode *child) {
//TODO This function is implemented twice, once with optional_ownership_ptr, once with unique_ref. Redundancy!
optional_ownership_ptr<DataNode> chain = cpputils::WithoutOwnership<DataNode>(child);
for(unsigned int i=0; i<num; ++i) {
auto newnode = _nodeStore->createNewInnerNode(*chain);
chain = cpputils::WithOwnership<DataNode>(std::move(newnode));
}
return chain;
}
unique_ref<DataNode> DataTree::createChainOfInnerNodes(unsigned int num, unique_ref<DataNode> child) {
unique_ref<DataNode> chain = std::move(child);
for(unsigned int i=0; i<num; ++i) {
chain = _nodeStore->createNewInnerNode(*chain);
}
return chain;
}
DataInnerNode* DataTree::increaseTreeDepth(unsigned int levels) {
ASSERT(levels >= 1, "Parameter out of bounds: tried to increase tree depth by zero.");
auto copyOfOldRoot = _nodeStore->createNewNodeAsCopyFrom(*_rootNode);
auto chain = createChainOfInnerNodes(levels-1, copyOfOldRoot.get());
auto newRootNode = DataNode::convertToNewInnerNode(std::move(_rootNode), *chain);
DataInnerNode *result = newRootNode.get();
_rootNode = std::move(newRootNode);
return result;
}
unique_ref<DataLeafNode> DataTree::addDataLeafToFullTree() {
DataInnerNode *rootNode = increaseTreeDepth(1);
auto newLeaf = addDataLeafAt(rootNode);
return newLeaf;
}
const Key &DataTree::key() const {
return _rootNode->key();
}
void DataTree::flush() const {
// By grabbing a lock, we ensure that all modifying functions don't run currently and are therefore flushed
unique_lock<shared_mutex> lock(_mutex);
// We also have to flush the root node
_rootNode->flush();
}
unique_ref<DataNode> DataTree::releaseRootNode() {
return std::move(_rootNode);
}
//TODO Test numLeaves(), for example also two configurations with same number of bytes but different number of leaves (last leaf has 0 bytes)
uint32_t DataTree::numLeaves() const {
shared_lock<shared_mutex> lock(_mutex);
return _numLeaves(*_rootNode);
}
uint32_t DataTree::_numLeaves(const DataNode &node) const {
const DataLeafNode *leaf = dynamic_cast<const DataLeafNode*>(&node);
if (leaf != nullptr) {
return 1;
}
const DataInnerNode &inner = dynamic_cast<const DataInnerNode&>(node);
uint64_t numLeavesInLeftChildren = (inner.numChildren()-1) * leavesPerFullChild(inner);
auto lastChild = _nodeStore->load(inner.LastChild()->key());
ASSERT(lastChild != none, "Couldn't load last child");
uint64_t numLeavesInRightChild = _numLeaves(**lastChild);
return numLeavesInLeftChildren + numLeavesInRightChild;
}
void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, function<void (DataLeafNode*, uint32_t)> func) {
//TODO Can we traverse in parallel?
unique_lock<shared_mutex> lock(_mutex); //TODO Only lock when resizing. Otherwise parallel read/write to a blob is not possible!
ASSERT(beginIndex <= endIndex, "Invalid parameters");
if (0 == endIndex) {
// In this case the utils::ceilLog(_, endIndex) below would fail
return;
}
uint8_t neededTreeDepth = utils::ceilLog(_nodeStore->layout().maxChildrenPerInnerNode(), endIndex);
uint32_t numLeaves = this->_numLeaves(*_rootNode); // TODO Querying the size causes a tree traversal down to the leaves. Possible without querying the size?
if (_rootNode->depth() < neededTreeDepth) {
//TODO Test cases that actually increase it here by 0 level / 1 level / more than 1 level
increaseTreeDepth(neededTreeDepth - _rootNode->depth());
}
if (numLeaves <= beginIndex) {
//TODO Test cases with numLeaves < / >= beginIndex
// There is a gap between the current size and the begin of the traversal
return _traverseLeaves(_rootNode.get(), 0, numLeaves-1, endIndex, [beginIndex, numLeaves, &func, this](DataLeafNode* node, uint32_t index) {
if (index >= beginIndex) {
func(node, index);
} else if (index == numLeaves - 1) {
// It is the old last leaf - resize it to maximum
node->resize(_nodeStore->layout().maxBytesPerLeaf());
}
});
} else if (numLeaves < endIndex) {
// We are starting traversal in the valid region, but traverse until after it (we grow new leaves)
return _traverseLeaves(_rootNode.get(), 0, beginIndex, endIndex, [numLeaves, &func, this] (DataLeafNode *node, uint32_t index) {
if (index == numLeaves - 1) {
// It is the old last leaf - resize it to maximum
node->resize(_nodeStore->layout().maxBytesPerLeaf());
}
func(node, index);
});
} else {
//We are traversing entirely inside the valid region
_traverseLeaves(_rootNode.get(), 0, beginIndex, endIndex, func);
}
}
void DataTree::_traverseLeaves(DataNode *root, uint32_t leafOffset, uint32_t beginIndex, uint32_t endIndex, function<void (DataLeafNode*, uint32_t)> func) {
DataLeafNode *leaf = dynamic_cast<DataLeafNode*>(root);
if (leaf != nullptr) {
ASSERT(beginIndex <= 1 && endIndex <= 1, "If root node is a leaf, the (sub)tree has only one leaf - access indices must be 0 or 1.");
if (beginIndex == 0 && endIndex == 1) {
func(leaf, leafOffset);
}
return;
}
DataInnerNode *inner = dynamic_cast<DataInnerNode*>(root);
uint32_t leavesPerChild = leavesPerFullChild(*inner);
uint32_t beginChild = beginIndex/leavesPerChild;
uint32_t endChild = utils::ceilDivision(endIndex, leavesPerChild);
vector<unique_ref<DataNode>> children = getOrCreateChildren(inner, beginChild, endChild);
for (uint32_t childIndex = beginChild; childIndex < endChild; ++childIndex) {
uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localBeginIndex = utils::maxZeroSubtraction(beginIndex, childOffset);
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
auto child = std::move(children[childIndex-beginChild]);
_traverseLeaves(child.get(), leafOffset + childOffset, localBeginIndex, localEndIndex, func);
}
}
vector<unique_ref<DataNode>> DataTree::getOrCreateChildren(DataInnerNode *node, uint32_t begin, uint32_t end) {
vector<unique_ref<DataNode>> children;
children.reserve(end-begin);
for (uint32_t childIndex = begin; childIndex < std::min(node->numChildren(), end); ++childIndex) {
auto child = _nodeStore->load(node->getChild(childIndex)->key());
ASSERT(child != none, "Couldn't load child node");
children.emplace_back(std::move(*child));
}
for (uint32_t childIndex = node->numChildren(); childIndex < end; ++childIndex) {
//TODO This creates each child with one chain to one leaf only, and then on the next lower level it
// has to create the children for the child. Would be faster to directly create full trees if necessary.
children.emplace_back(addChildTo(node));
}
ASSERT(children.size() == end-begin, "Number of children in the result is wrong");
return children;
}
unique_ref<DataNode> DataTree::addChildTo(DataInnerNode *node) {
auto new_leaf = _nodeStore->createNewLeafNode();
new_leaf->resize(_nodeStore->layout().maxBytesPerLeaf());
auto chain = createChainOfInnerNodes(node->depth()-1, std::move(new_leaf));
node->addChild(*chain);
return std::move(chain);
}
uint32_t DataTree::leavesPerFullChild(const DataInnerNode &root) const {
return utils::intPow(_nodeStore->layout().maxChildrenPerInnerNode(), (uint32_t)root.depth()-1);
}
uint64_t DataTree::numStoredBytes() const {
shared_lock<shared_mutex> lock(_mutex);
return _numStoredBytes();
}
uint64_t DataTree::_numStoredBytes() const {
return _numStoredBytes(*_rootNode);
}
uint64_t DataTree::_numStoredBytes(const DataNode &root) const {
const DataLeafNode *leaf = dynamic_cast<const DataLeafNode*>(&root);
if (leaf != nullptr) {
return leaf->numBytes();
}
const DataInnerNode &inner = dynamic_cast<const DataInnerNode&>(root);
uint64_t numBytesInLeftChildren = (inner.numChildren()-1) * leavesPerFullChild(inner) * _nodeStore->layout().maxBytesPerLeaf();
auto lastChild = _nodeStore->load(inner.LastChild()->key());
ASSERT(lastChild != none, "Couldn't load last child");
uint64_t numBytesInRightChild = _numStoredBytes(**lastChild);
return numBytesInLeftChildren + numBytesInRightChild;
}
void DataTree::resizeNumBytes(uint64_t newNumBytes) {
//TODO Can we resize in parallel? Especially creating new blocks (i.e. encrypting them) is expensive and should be done in parallel.
boost::upgrade_lock<shared_mutex> lock(_mutex);
{
boost::upgrade_to_unique_lock<shared_mutex> exclusiveLock(lock);
//TODO Faster implementation possible (no addDataLeaf()/removeLastDataLeaf() in a loop, but directly resizing)
LastLeaf(_rootNode.get())->resize(_nodeStore->layout().maxBytesPerLeaf());
uint64_t currentNumBytes = _numStoredBytes();
ASSERT(currentNumBytes % _nodeStore->layout().maxBytesPerLeaf() == 0, "The last leaf is not a max data leaf, although we just resized it to be one.");
uint32_t currentNumLeaves = currentNumBytes / _nodeStore->layout().maxBytesPerLeaf();
uint32_t newNumLeaves = std::max(UINT64_C(1), utils::ceilDivision(newNumBytes, _nodeStore->layout().maxBytesPerLeaf()));
for(uint32_t i = currentNumLeaves; i < newNumLeaves; ++i) {
addDataLeaf()->resize(_nodeStore->layout().maxBytesPerLeaf());
}
for(uint32_t i = currentNumLeaves; i > newNumLeaves; --i) {
removeLastDataLeaf();
}
uint32_t newLastLeafSize = newNumBytes - (newNumLeaves-1)*_nodeStore->layout().maxBytesPerLeaf();
LastLeaf(_rootNode.get())->resize(newLastLeafSize);
}
ASSERT(newNumBytes == _numStoredBytes(), "We resized to the wrong number of bytes ("+std::to_string(numStoredBytes())+" instead of "+std::to_string(newNumBytes)+")");
}
optional_ownership_ptr<DataLeafNode> DataTree::LastLeaf(DataNode *root) {
DataLeafNode *leaf = dynamic_cast<DataLeafNode*>(root);
if (leaf != nullptr) {
return WithoutOwnership(leaf);
}
DataInnerNode *inner = dynamic_cast<DataInnerNode*>(root);
auto lastChild = _nodeStore->load(inner->LastChild()->key());
ASSERT(lastChild != none, "Couldn't load last child");
return WithOwnership(LastLeaf(std::move(*lastChild)));
}
unique_ref<DataLeafNode> DataTree::LastLeaf(unique_ref<DataNode> root) {
auto leaf = dynamic_pointer_move<DataLeafNode>(root);
if (leaf != none) {
return std::move(*leaf);
}
auto inner = dynamic_pointer_move<DataInnerNode>(root);
ASSERT(inner != none, "Root node is neither a leaf nor an inner node");
auto child = _nodeStore->load((*inner)->LastChild()->key());
ASSERT(child != none, "Couldn't load last child");
return LastLeaf(std::move(*child));
}
uint64_t DataTree::maxBytesPerLeaf() const {
return _nodeStore->layout().maxBytesPerLeaf();
}
}
}
}

View File

@ -0,0 +1,79 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATATREE_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_DATATREE_H_
#include <memory>
#include <messmer/cpp-utils/macros.h>
#include <messmer/cpp-utils/pointer/optional_ownership_ptr.h>
#include "../datanodestore/DataNodeView.h"
//TODO Replace with C++14 once std::shared_mutex is supported