#include "DataInnerNode.h" #include "DataLeafNode.h" #include "DataNodeStore.h" #include "messmer/blockstore/interface/BlockStore.h" #include "messmer/blockstore/interface/Block.h" #include "messmer/blockstore/utils/BlockStoreUtils.h" using blockstore::BlockStore; using blockstore::Block; using blockstore::Key; using std::unique_ptr; using std::make_unique; using std::runtime_error; namespace blobstore { namespace onblocks { namespace datanodestore { DataNodeStore::DataNodeStore(unique_ptr blockstore, uint32_t blocksizeBytes) : _blockstore(std::move(blockstore)), _layout(blocksizeBytes) { } DataNodeStore::~DataNodeStore() { } unique_ptr DataNodeStore::load(unique_ptr block) { assert(block->size() == _layout.blocksizeBytes()); DataNodeView node(std::move(block)); if (*node.Depth() == 0) { return unique_ptr(new DataLeafNode(std::move(node))); } else if (*node.Depth() <= MAX_DEPTH) { return unique_ptr(new DataInnerNode(std::move(node))); } else { throw runtime_error("Tree is to deep. Data corruption?"); } } unique_ptr DataNodeStore::createNewInnerNode(const DataNode &first_child) { assert(first_child.node().layout().blocksizeBytes() == _layout.blocksizeBytes()); // This might be violated if source is from a different DataNodeStore auto block = _blockstore->create(_layout.blocksizeBytes()); return DataInnerNode::InitializeNewNode(std::move(block), first_child); } unique_ptr DataNodeStore::createNewLeafNode() { auto block = _blockstore->create(_layout.blocksizeBytes()); return DataLeafNode::InitializeNewNode(std::move(block)); } unique_ptr DataNodeStore::load(const Key &key) { auto block = _blockstore->load(key); if (block == nullptr) { return nullptr; } return load(std::move(block)); } unique_ptr DataNodeStore::createNewNodeAsCopyFrom(const DataNode &source) { assert(source.node().layout().blocksizeBytes() == _layout.blocksizeBytes()); // This might be violated if source is from a different DataNodeStore auto newBlock = blockstore::utils::copyToNewBlock(_blockstore.get(), source.node().block()); return load(std::move(newBlock)); } unique_ptr DataNodeStore::overwriteNodeWith(unique_ptr target, const DataNode &source) { assert(target->node().layout().blocksizeBytes() == _layout.blocksizeBytes()); assert(source.node().layout().blocksizeBytes() == _layout.blocksizeBytes()); Key key = target->key(); { auto targetBlock = target->node().releaseBlock(); target.reset(); blockstore::utils::copyTo(targetBlock.get(), source.node().block()); } return load(key); } void DataNodeStore::remove(unique_ptr node) { auto block = node->node().releaseBlock(); node.reset(); _blockstore->remove(std::move(block)); } uint64_t DataNodeStore::numNodes() const { return _blockstore->numBlocks(); } void DataNodeStore::removeSubtree(unique_ptr node) { DataInnerNode *inner = dynamic_cast(node.get()); if (inner != nullptr) { for (int i = 0; i < inner->numChildren(); ++i) { auto child = load(inner->getChild(i)->key()); removeSubtree(std::move(child)); } } remove(std::move(node)); } DataNodeLayout DataNodeStore::layout() const { return _layout; } } } }