Introduce DataNodeStore and refactor DataNode

This commit is contained in:
Sebastian Messmer 2014-12-13 17:43:02 +01:00
parent a97eb08224
commit 9ccb583b4b
11 changed files with 258 additions and 139 deletions

View File

@ -1,3 +1,3 @@
add_library(blobstore_onblocks BlobOnBlocks.cpp BlobStoreOnBlocks.cpp impl/DataNode.cpp impl/DataLeafNode.cpp impl/DataInnerNode.cpp)
add_library(blobstore_onblocks BlobOnBlocks.cpp BlobStoreOnBlocks.cpp impl/DataNode.cpp impl/DataLeafNode.cpp impl/DataInnerNode.cpp impl/DataNodeStore.cpp)
target_link_libraries(blobstore_onblocks blockstore_interface)

View File

@ -1,5 +1,7 @@
#include <blobstore/implementations/onblocks/impl/DataNodeStore.h>
#include "DataInnerNode.h"
using std::unique_ptr;
using blockstore::Block;
using blockstore::Data;
@ -7,17 +9,17 @@ using blockstore::Data;
namespace blobstore {
namespace onblocks {
DataInnerNode::DataInnerNode(DataNodeView view)
: DataNode(std::move(view)) {
DataInnerNode::DataInnerNode(DataNodeView view, const Key &key, DataNodeStore *nodestorage)
: DataNode(std::move(view), key, nodestorage) {
}
DataInnerNode::~DataInnerNode() {
}
void DataInnerNode::InitializeNewNode(const Key &first_child_key, const DataNodeView &first_child) {
*_node.Depth() = *first_child.Depth() + 1;
*_node.Size() = 1;
first_child_key.ToBinary(ChildrenBegin()->key);
void DataInnerNode::InitializeNewNode(const DataNode &first_child) {
*node().Depth() = first_child.depth() + 1;
*node().Size() = 1;
first_child.key().ToBinary(ChildrenBegin()->key);
}
void DataInnerNode::read(off_t offset, size_t count, Data *result) const {
@ -28,9 +30,12 @@ void DataInnerNode::read(off_t offset, size_t count, Data *result) const {
uint8_t *target = (uint8_t*)result->data();
const ChildEntry *child = ChildContainingFirstByteAfterOffset(offset);
off_t blockrelative_offset = offset - numBytesInLeftwardSiblings(child);
uint64_t already_read_bytes = readFromChild(child, blockrelative_offset, count, target);
while(numBytesInChildAndLeftwardSiblings(child) < end) {
uint32_t child_index = child-ChildrenBegin();
uint64_t child_first_byte_index = maxNumBytesPerChild() * child_index;
uint64_t next_child_first_byte_index = child_first_byte_index + maxNumBytesPerChild();
off_t childrelative_offset = offset - child_first_byte_index;
uint64_t already_read_bytes = readFromChild(child, childrelative_offset, count, target);
while(next_child_first_byte_index < end) { //TODO Write a test case that breaks when we're having <= instead of < here
++child;
already_read_bytes += readFromChild(child, 0, count, target + already_read_bytes);
};
@ -38,7 +43,8 @@ void DataInnerNode::read(off_t offset, size_t count, Data *result) const {
}
uint64_t DataInnerNode::readFromChild(const ChildEntry *child, off_t inner_offset, size_t count, uint8_t *target) const {
uint64_t readable_bytes = std::min(count, numBytesInChild(child) - inner_offset);
//TODO This only works for non-rightmost children
uint64_t readable_bytes = std::min(count, maxNumBytesPerChild() - inner_offset);
//TODO READ...
@ -46,48 +52,49 @@ uint64_t DataInnerNode::readFromChild(const ChildEntry *child, off_t inner_offse
}
const DataInnerNode::ChildEntry *DataInnerNode::ChildContainingFirstByteAfterOffset(off_t offset) const {
uint32_t offset_blocks = offset / _node.BLOCKSIZE_BYTES;
uint8_t child_index = offset/maxNumBytesPerChild();
return ChildrenBegin()+child_index;
}
//TODO no binary search anymore
return
std::upper_bound(ChildrenBegin(), ChildrenEnd(), offset_blocks, [](uint32_t offset_blocks, const ChildEntry &child) {
return false;//return offset_blocks < child.numBlocksInThisAndLeftwardNodes;
});
uint32_t DataInnerNode::maxNumDataBlocksPerChild() const {
return std::round(std::pow(MAX_STORED_CHILDREN, *node().Depth()));
}
uint64_t DataInnerNode::numBytesInThisNode() const {
return numBytesInChildAndLeftwardSiblings(ChildrenLast());
return numBytesInNonRightmostChildrenSum() + numBytesInRightmostChild();
}
uint64_t DataInnerNode::numBytesInChild(const ChildEntry *child) const {
return numBytesInChildAndLeftwardSiblings(child) - numBytesInLeftwardSiblings(child);
uint64_t DataInnerNode::numBytesInNonRightmostChildrenSum() const {
return maxNumBytesPerChild() * (numChildren()-1);
}
uint64_t DataInnerNode::numBytesInLeftwardSiblings(const ChildEntry *child) const {
if (child == ChildrenBegin()) {
return 0;
}
return numBytesInChildAndLeftwardSiblings(child-1);
uint64_t DataInnerNode::numBytesInRightmostChild() const {
Key rightmost_child_key = Key::FromBinary(RightmostChild()->key);
auto rightmost_child = storage().load(rightmost_child_key);
return rightmost_child->numBytesInThisNode();
}
uint64_t DataInnerNode::numBytesInChildAndLeftwardSiblings(const ChildEntry *child) const {
//TODO Rewrite
//return (uint64_t)child->numBlocksInThisAndLeftwardNodes * _node.BLOCKSIZE_BYTES;
uint32_t DataInnerNode::numChildren() const {
return *node().Size();
}
//TODO This only works for non-rightmost children
uint64_t DataInnerNode::maxNumBytesPerChild() const {
return maxNumDataBlocksPerChild() * DataNodeView::DATASIZE_BYTES;
}
DataInnerNode::ChildEntry *DataInnerNode::ChildrenBegin() {
return const_cast<ChildEntry*>(const_cast<const DataInnerNode*>(this)->ChildrenBegin());
}
const DataInnerNode::ChildEntry *DataInnerNode::ChildrenBegin() const {
return _node.DataBegin<ChildEntry>();
return node().DataBegin<ChildEntry>();
}
const DataInnerNode::ChildEntry *DataInnerNode::ChildrenEnd() const {
return ChildrenBegin() + *_node.Size();
return ChildrenBegin() + *node().Size();
}
const DataInnerNode::ChildEntry *DataInnerNode::ChildrenLast() const{
const DataInnerNode::ChildEntry *DataInnerNode::RightmostChild() const{
return ChildrenEnd()-1;
}

View File

@ -9,7 +9,7 @@ namespace onblocks {
class DataInnerNode: public DataNode {
public:
DataInnerNode(DataNodeView block);
DataInnerNode(DataNodeView block, const Key &key, DataNodeStore *nodestorage);
virtual ~DataInnerNode();
struct ChildEntry {
@ -18,7 +18,7 @@ public:
static constexpr uint32_t MAX_STORED_CHILDREN = DataNodeView::DATASIZE_BYTES / sizeof(ChildEntry);
void InitializeNewNode(const Key &first_child_key, const DataNodeView &first_child);
void InitializeNewNode(const DataNode &first_child);
void read(off_t offset, size_t count, blockstore::Data *result) const override;
void write(off_t offset, size_t count, const blockstore::Data &data) override;
@ -30,14 +30,16 @@ private:
ChildEntry *ChildrenBegin();
const ChildEntry *ChildrenBegin() const;
const ChildEntry *ChildrenEnd() const;
const ChildEntry *ChildrenLast() const;
const ChildEntry *RightmostChild() const;
uint64_t readFromChild(const ChildEntry *child, off_t inner_offset, size_t count, uint8_t *target) const;
uint32_t numChildren() const;
uint32_t maxNumDataBlocksPerChild() const;
uint64_t maxNumBytesPerChild() const;
uint64_t numBytesInNonRightmostChildrenSum() const;
uint64_t numBytesInRightmostChild() const;
const ChildEntry *ChildContainingFirstByteAfterOffset(off_t offset) const;
uint64_t numBytesInChildAndLeftwardSiblings(const ChildEntry *child) const;
uint64_t numBytesInLeftwardSiblings(const ChildEntry *child) const;
uint64_t numBytesInChild(const ChildEntry *child) const;
};
}

View File

@ -7,8 +7,8 @@ using blockstore::Data;
namespace blobstore {
namespace onblocks {
DataLeafNode::DataLeafNode(DataNodeView view)
: DataNode(std::move(view)) {
DataLeafNode::DataLeafNode(DataNodeView view, const Key &key, DataNodeStore *nodestorage)
: DataNode(std::move(view), key, nodestorage) {
assert(numBytesInThisNode() <= MAX_STORED_BYTES);
}
@ -18,27 +18,27 @@ DataLeafNode::~DataLeafNode() {
void DataLeafNode::read(off_t offset, size_t count, Data *result) const {
assert(count <= result->size());
assert(offset+count <= numBytesInThisNode());
std::memcpy(result->data(), _node.DataBegin<unsigned char>()+offset, count);
std::memcpy(result->data(), node().DataBegin<unsigned char>()+offset, count);
}
void DataLeafNode::write(off_t offset, size_t count, const Data &data) {
assert(count <= data.size());
assert(offset+count <= numBytesInThisNode());
std::memcpy(_node.DataBegin<unsigned char>()+offset, data.data(), count);
std::memcpy(node().DataBegin<unsigned char>()+offset, data.data(), count);
}
void DataLeafNode::InitializeNewNode() {
*_node.Depth() = 0;
*_node.Size() = 0;
*node().Depth() = 0;
*node().Size() = 0;
//fillDataWithZeroes(); not needed, because a newly created block will be zeroed out. DataLeafNodeTest.SpaceIsZeroFilledWhenGrowing ensures this.
}
void DataLeafNode::fillDataWithZeroesFromTo(off_t begin, off_t end) {
std::memset(_node.DataBegin<unsigned char>()+begin, 0, end-begin);
std::memset(node().DataBegin<unsigned char>()+begin, 0, end-begin);
}
uint64_t DataLeafNode::numBytesInThisNode() const {
return *_node.Size();
return *node().Size();
}
void DataLeafNode::resize(uint64_t newsize_bytes) {
@ -47,11 +47,11 @@ void DataLeafNode::resize(uint64_t newsize_bytes) {
// If we're shrinking, we want to delete the old data
// (overwrite it with zeroes).
// TODO Mention this in thesis
if (newsize_bytes < *_node.Size()) {
fillDataWithZeroesFromTo(newsize_bytes, *_node.Size());
if (newsize_bytes < *node().Size()) {
fillDataWithZeroesFromTo(newsize_bytes, *node().Size());
}
*_node.Size() = newsize_bytes;
*node().Size() = newsize_bytes;
}
}

View File

@ -9,7 +9,7 @@ namespace onblocks {
class DataLeafNode: public DataNode {
public:
DataLeafNode(DataNodeView block);
DataLeafNode(DataNodeView block, const Key &key, DataNodeStore *nodestorage);
virtual ~DataLeafNode();
static constexpr uint32_t MAX_STORED_BYTES = DataNodeView::DATASIZE_BYTES;

View File

@ -1,3 +1,4 @@
#include <blobstore/implementations/onblocks/impl/DataNodeStore.h>
#include "DataNode.h"
#include "DataInnerNode.h"
@ -12,35 +13,35 @@ using std::runtime_error;
namespace blobstore {
namespace onblocks {
DataNode::DataNode(DataNodeView node)
: _node(std::move(node)) {
DataNode::DataNode(DataNodeView node, const Key &key, DataNodeStore *nodestorage)
: _key(key), _node(std::move(node)), _nodestorage(nodestorage) {
}
DataNode::~DataNode() {
}
unique_ptr<DataNode> DataNode::load(unique_ptr<Block> block) {
DataNodeView node(std::move(block));
if (*node.Depth() == 0) {
return unique_ptr<DataLeafNode>(new DataLeafNode(std::move(node)));
} else if (*node.Depth() < MAX_DEPTH) {
return unique_ptr<DataInnerNode>(new DataInnerNode(std::move(node)));
} else {
throw runtime_error("Tree is to deep. Data corruption?");
}
DataNodeStore &DataNode::storage() {
return const_cast<DataNodeStore&>(const_cast<const DataNode*>(this)->storage());
}
unique_ptr<DataNode> DataNode::createNewInnerNode(unique_ptr<Block> block, const Key &first_child_key, const DataNode &first_child) {
auto newNode = unique_ptr<DataInnerNode>(new DataInnerNode(std::move(block)));
newNode->InitializeNewNode(first_child_key, first_child._node);
return std::move(newNode);
const DataNodeStore &DataNode::storage() const {
return *_nodestorage;
}
unique_ptr<DataNode> DataNode::createNewLeafNode(unique_ptr<Block> block) {
auto newNode = unique_ptr<DataLeafNode>(new DataLeafNode(std::move(block)));
newNode->InitializeNewNode();
return std::move(newNode);
DataNodeView &DataNode::node() {
return const_cast<DataNodeView&>(const_cast<const DataNode*>(this)->node());
}
const DataNodeView &DataNode::node() const {
return _node;
}
const Key &DataNode::key() const {
return _key;
}
uint8_t DataNode::depth() const {
return *_node.Depth();
}
}

View File

@ -8,27 +8,37 @@
namespace blobstore {
namespace onblocks {
class DataNodeStore;
class DataNode {
public:
virtual ~DataNode();
static constexpr uint8_t MAX_DEPTH = 10;
virtual void read(off_t offset, size_t count, blockstore::Data *result) const = 0;
virtual void write(off_t offset, size_t count, const blockstore::Data &data) = 0;
virtual void resize(uint64_t newsize_bytes) = 0;
virtual uint64_t numBytesInThisNode() const = 0;
static std::unique_ptr<DataNode> load(std::unique_ptr<blockstore::Block> block);
static std::unique_ptr<DataNode> createNewLeafNode(std::unique_ptr<blockstore::Block> block);
static std::unique_ptr<DataNode> createNewInnerNode(std::unique_ptr<blockstore::Block> block, const Key &first_child_key, const DataNode &first_child);
const Key &key() const;
uint8_t depth() const;
protected:
DataNode(DataNodeView block);
DataNode(DataNodeView block, const Key &key, DataNodeStore *nodestorage);
DataNodeStore &storage();
const DataNodeStore &storage() const;
DataNodeView &node();
const DataNodeView &node() const;
private:
Key _key; //TODO Remove this and make blockstore::Block store the key
DataNodeView _node;
DataNodeStore *_nodestorage;
DISALLOW_COPY_AND_ASSIGN(DataNode);
};
}

View File

@ -0,0 +1,60 @@
#include <blobstore/implementations/onblocks/impl/DataNodeStore.h>
#include "blockstore/interface/BlockStore.h"
#include "blockstore/interface/Block.h"
#include "DataLeafNode.h"
#include "DataInnerNode.h"
using blockstore::BlockStore;
using blockstore::Block;
using blockstore::Key;
using std::unique_ptr;
using std::make_unique;
using std::runtime_error;
namespace blobstore {
namespace onblocks {
DataNodeStore::DataNodeStore(unique_ptr<BlockStore> blockstore)
: _blockstore(std::move(blockstore)) {
}
DataNodeStore::~DataNodeStore() {
}
unique_ptr<DataNode> DataNodeStore::load(unique_ptr<Block> block, const Key &key) {
DataNodeView node(std::move(block));
if (*node.Depth() == 0) {
return unique_ptr<DataLeafNode>(new DataLeafNode(std::move(node), key, this));
} else if (*node.Depth() <= MAX_DEPTH) {
return unique_ptr<DataInnerNode>(new DataInnerNode(std::move(node), key, this));
} else {
throw runtime_error("Tree is to deep. Data corruption?");
}
}
unique_ptr<DataNode> DataNodeStore::createNewInnerNode(const DataNode &first_child) {
auto block = _blockstore->create(DataNodeView::BLOCKSIZE_BYTES);
auto newNode = make_unique<DataInnerNode>(std::move(block.block), block.key, this);
newNode->InitializeNewNode(first_child);
return std::move(newNode);
}
unique_ptr<DataNode> DataNodeStore::createNewLeafNode() {
auto block = _blockstore->create(DataNodeView::BLOCKSIZE_BYTES);
auto newNode = make_unique<DataLeafNode>(std::move(block.block), block.key, this);
newNode->InitializeNewNode();
return std::move(newNode);
}
unique_ptr<DataNode> DataNodeStore::load(const Key &key) {
return load(_blockstore->load(key), key);
}
unique_ptr<const DataNode> DataNodeStore::load(const Key &key) const {
return const_cast<DataNodeStore*>(this)->load(key);
}
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#ifndef BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_IMPL_DATANODESTORE_H_
#define BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_IMPL_DATANODESTORE_H_
#include <memory>
#include "fspp/utils/macros.h"
namespace blockstore{
class Block;
class BlockStore;
class Key;
}
namespace blobstore {
namespace onblocks {
class DataNode;
class DataNodeStore {
public:
DataNodeStore(std::unique_ptr<blockstore::BlockStore> blockstore);
virtual ~DataNodeStore();
static constexpr uint8_t MAX_DEPTH = 10;
std::unique_ptr<DataNode> load(const blockstore::Key &key);
std::unique_ptr<const DataNode> load(const blockstore::Key &key) const;
std::unique_ptr<DataNode> createNewLeafNode();
std::unique_ptr<DataNode> createNewInnerNode(const DataNode &first_child);
private:
std::unique_ptr<DataNode> load(std::unique_ptr<blockstore::Block> block, const blockstore::Key &key);
std::unique_ptr<blockstore::BlockStore> _blockstore;
DISALLOW_COPY_AND_ASSIGN(DataNodeStore);
};
}
}
#endif

View File

@ -1,3 +1,4 @@
#include <blobstore/implementations/onblocks/impl/DataNodeStore.h>
#include <gtest/gtest.h>
#include "blockstore/implementations/testfake/FakeBlockStore.h"
@ -28,11 +29,10 @@ public:
DataLeafNodeTest():
ZEROES(DataLeafNode::MAX_STORED_BYTES),
randomData(DataLeafNode::MAX_STORED_BYTES),
blockStore(make_unique<FakeBlockStore>()),
block(blockStore->create(DataNodeView::BLOCKSIZE_BYTES)),
leafblock(blockStore->create(DataNodeView::BLOCKSIZE_BYTES)),
leafblockdata((uint8_t*)leafblock.block->data()),
leaf(DataNode::createNewLeafNode(std::move(leafblock.block))) {
_blockStore(make_unique<FakeBlockStore>()),
blockStore(_blockStore.get()),
nodeStore(make_unique<DataNodeStore>(std::move(_blockStore))),
leaf(nodeStore->createNewLeafNode()) {
ZEROES.FillWithZeroes();
@ -42,11 +42,10 @@ public:
}
Key WriteDataToNewLeafBlockAndReturnKey() {
auto block = blockStore->create(DataNodeView::BLOCKSIZE_BYTES);
auto leaf = DataNode::createNewLeafNode(std::move(block.block));
leaf->resize(randomData.size());
leaf->write(0, randomData.size(), randomData);
return block.key;
auto newleaf = nodeStore->createNewLeafNode();
newleaf->resize(randomData.size());
newleaf->write(0, randomData.size(), randomData);
return newleaf->key();
}
void FillLeafBlockWithData() {
@ -55,17 +54,22 @@ public:
}
void ReadDataFromLoadedLeafBlock(Key key, Data *data) {
auto leaf = DataNode::load(blockStore->load(key));
auto leaf = nodeStore->load(key);
EXPECT_IS_PTR_TYPE(DataLeafNode, leaf.get());
leaf->read(0, data->size(), data);
}
void ResizeLeaf(const Key &key, size_t size) {
auto leaf = nodeStore->load(key);
EXPECT_IS_PTR_TYPE(DataLeafNode, leaf.get());
leaf->resize(size);
}
Data ZEROES;
Data randomData;
unique_ptr<BlockStore> blockStore;
BlockWithKey block;
BlockWithKey leafblock;
const uint8_t *leafblockdata;
unique_ptr<BlockStore> _blockStore;
BlockStore *blockStore;
unique_ptr<DataNodeStore> nodeStore;
unique_ptr<DataNode> leaf;
};
@ -92,15 +96,20 @@ TEST_F(DataLeafNodeTest, NewLeafNodeHasSizeZero) {
}
TEST_F(DataLeafNodeTest, NewLeafNodeHasSizeZero_AfterLoading) {
{
DataNode::createNewLeafNode(std::move(block.block));
}
auto leaf = DataNode::load(blockStore->load(block.key));
Key key = nodeStore->createNewLeafNode()->key();
auto leaf = nodeStore->load(key);
EXPECT_EQ(0u, leaf->numBytesInThisNode());
}
class DataLeafNodeSizeTest: public DataLeafNodeTest, public WithParamInterface<unsigned int> {};
class DataLeafNodeSizeTest: public DataLeafNodeTest, public WithParamInterface<unsigned int> {
public:
Key CreateLeafResizeItAndReturnKey() {
auto leaf = nodeStore->createNewLeafNode();
leaf->resize(GetParam());
return leaf->key();
}
};
INSTANTIATE_TEST_CASE_P(DataLeafNodeSizeTest, DataLeafNodeSizeTest, Values(0, 1, 5, 16, 32, 512, DataLeafNode::MAX_STORED_BYTES));
TEST_P(DataLeafNodeSizeTest, ResizeNode_ReadSizeImmediately) {
@ -109,11 +118,9 @@ TEST_P(DataLeafNodeSizeTest, ResizeNode_ReadSizeImmediately) {
}
TEST_P(DataLeafNodeSizeTest, ResizeNode_ReadSizeAfterLoading) {
{
auto leaf = DataNode::createNewLeafNode(std::move(block.block));
leaf->resize(GetParam());
}
auto leaf = DataNode::load(blockStore->load(block.key));
Key key = CreateLeafResizeItAndReturnKey();
auto leaf = nodeStore->load(key);
EXPECT_EQ(GetParam(), leaf->numBytesInThisNode());
}
@ -139,14 +146,20 @@ TEST_F(DataLeafNodeTest, SpaceGetsZeroFilledWhenShrinkingAndRegrowing) {
}
TEST_F(DataLeafNodeTest, DataGetsZeroFilledWhenShrinking) {
FillLeafBlockWithData();
Key key = WriteDataToNewLeafBlockAndReturnKey();
uint32_t smaller_size = randomData.size() - 100;
//At first, we expect there to be random data in the underlying data block
EXPECT_EQ(0, std::memcmp((char*)randomData.data()+smaller_size, leafblockdata+DataNodeView::HEADERSIZE_BYTES+smaller_size, 100));
{
//At first, we expect there to be random data in the underlying data block
auto block = blockStore->load(key);
EXPECT_EQ(0, std::memcmp((char*)randomData.data()+smaller_size, (uint8_t*)block->data()+DataNodeView::HEADERSIZE_BYTES+smaller_size, 100));
}
//After shrinking, we expect there to be zeroes in the underlying data block
leaf->resize(smaller_size);
EXPECT_EQ(0, std::memcmp(ZEROES.data(), leafblockdata+DataNodeView::HEADERSIZE_BYTES+smaller_size, 100));
ResizeLeaf(key, smaller_size);
{
auto block = blockStore->load(leaf->key());
EXPECT_EQ(0, std::memcmp(ZEROES.data(), (uint8_t*)block->data()+DataNodeView::HEADERSIZE_BYTES+smaller_size, 100));
}
}
//TODO Write tests that only read part of the data

View File

@ -1,3 +1,4 @@
#include <blobstore/implementations/onblocks/impl/DataNodeStore.h>
#include <gtest/gtest.h>
#include "blockstore/implementations/testfake/FakeBlockStore.h"
@ -19,64 +20,48 @@ using namespace blobstore::onblocks;
class DataNodeTest: public Test {
public:
unique_ptr<BlockStore> blockStore = make_unique<FakeBlockStore>();
unique_ptr<BlockStore> _blockStore = make_unique<FakeBlockStore>();
BlockStore *blockStore = _blockStore.get();
unique_ptr<DataNodeStore> nodeStore = make_unique<DataNodeStore>(std::move(_blockStore));
};
#define EXPECT_IS_PTR_TYPE(Type, ptr) EXPECT_NE(nullptr, dynamic_cast<Type*>(ptr)) << "Given pointer cannot be cast to the given type"
TEST_F(DataNodeTest, CreateLeafNodeCreatesLeafNode) {
auto block = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
auto node = DataNode::createNewLeafNode(std::move(block.block));
auto node = nodeStore->createNewLeafNode();
EXPECT_IS_PTR_TYPE(DataLeafNode, node.get());
}
TEST_F(DataNodeTest, CreateInnerNodeCreatesInnerNode) {
auto leafblock = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
auto leaf = DataNode::createNewLeafNode(std::move(leafblock.block));
auto leaf = nodeStore->createNewLeafNode();
auto block = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
auto node = DataNode::createNewInnerNode(std::move(block.block), leafblock.key, *leaf);
auto node = nodeStore->createNewInnerNode(*leaf);
EXPECT_IS_PTR_TYPE(DataInnerNode, node.get());
}
TEST_F(DataNodeTest, LeafNodeIsRecognizedAfterStoreAndLoad) {
auto block = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
Key key = block.key;
{
DataNode::createNewLeafNode(std::move(block.block));
}
Key key = nodeStore->createNewLeafNode()->key();
auto loaded_node = DataNode::load(blockStore->load(key));
auto loaded_node = nodeStore->load(key);
EXPECT_IS_PTR_TYPE(DataLeafNode, loaded_node.get());
}
TEST_F(DataNodeTest, InnerNodeWithDepth1IsRecognizedAfterStoreAndLoad) {
auto block = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
Key key = block.key;
{
auto leafblock = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
auto leaf = DataNode::createNewLeafNode(std::move(leafblock.block));
DataNode::createNewInnerNode(std::move(block.block), leafblock.key, *leaf);
}
auto leaf = nodeStore->createNewLeafNode();
Key key = nodeStore->createNewInnerNode(*leaf)->key();
auto loaded_node = DataNode::load(blockStore->load(key));
auto loaded_node = nodeStore->load(key);
EXPECT_IS_PTR_TYPE(DataInnerNode, loaded_node.get());
}
TEST_F(DataNodeTest, InnerNodeWithDepth2IsRecognizedAfterStoreAndLoad) {
auto block = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
Key key = block.key;
{
auto leafblock = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
auto leaf = DataNode::createNewLeafNode(std::move(leafblock.block));
auto inner1block = blockStore->create(BlobStoreOnBlocks::BLOCKSIZE);
auto inner1 = DataNode::createNewInnerNode(std::move(inner1block.block), leafblock.key, *leaf);
DataNode::createNewInnerNode(std::move(block.block), inner1block.key, *inner1);
}
auto leaf = nodeStore->createNewLeafNode();
auto inner = nodeStore->createNewInnerNode(*leaf);
Key key = nodeStore->createNewInnerNode(*inner)->key();
auto loaded_node = DataNode::load(blockStore->load(key));
auto loaded_node = nodeStore->load(key);
EXPECT_IS_PTR_TYPE(DataInnerNode, loaded_node.get());
}
@ -86,11 +71,10 @@ TEST_F(DataNodeTest, DataNodeCrashesOnLoadIfDepthIsTooHigh) {
Key key = block.key;
{
DataNodeView view(std::move(block.block));
*view.Depth() = 200u; // this is an invalid depth
*view.Depth() = DataNodeStore::MAX_DEPTH + 1;
}
auto loaded_block = blockStore->load(key);
EXPECT_ANY_THROW(
DataNode::load(std::move(loaded_block))
nodeStore->load(key)
);
}