Started rewriting traversal

This commit is contained in:
Sebastian Messmer 2016-07-10 22:57:39 +02:00
parent 2bc3b641aa
commit 98b85ea8b6
10 changed files with 256 additions and 94 deletions

View File

@ -11,6 +11,7 @@ set(SOURCES
implementations/onblocks/datanodestore/DataInnerNode.cpp
implementations/onblocks/datanodestore/DataNodeStore.cpp
implementations/onblocks/datatreestore/impl/algorithms.cpp
implementations/onblocks/datatreestore/impl/LeafTraverser.cpp
implementations/onblocks/datatreestore/DataTree.cpp
implementations/onblocks/datatreestore/DataTreeStore.cpp
implementations/onblocks/BlobOnBlocks.cpp

View File

@ -37,21 +37,30 @@ void BlobOnBlocks::resize(uint64_t numBytes) {
_sizeCache = numBytes;
}
void BlobOnBlocks::traverseLeaves(uint64_t beginByte, uint64_t sizeBytes, function<void (uint64_t, DataLeafNode *leaf, uint32_t, uint32_t)> func) const {
void BlobOnBlocks::traverseLeaves(uint64_t beginByte, uint64_t sizeBytes, function<void (uint64_t leafOffset, DataLeafNode *leaf, uint32_t begin, uint32_t count)> onExistingLeaf, function<Data (uint64_t leafOffset, uint32_t count)> onCreateLeaf) const {
uint64_t endByte = beginByte + sizeBytes;
uint32_t firstLeaf = beginByte / _datatree->maxBytesPerLeaf();
uint32_t endLeaf = utils::ceilDivision(endByte, _datatree->maxBytesPerLeaf());
bool writingOutside = size() < endByte; // TODO Calling size() is slow because it has to traverse the tree
_datatree->traverseLeaves(firstLeaf, endLeaf, [&func, beginByte, endByte, endLeaf, writingOutside](DataLeafNode *leaf, uint32_t leafIndex) {
uint64_t indexOfFirstLeafByte = leafIndex * leaf->maxStoreableBytes();
bool writingOutside = size() < endByte; // TODO Calling size() is slow because it has to traverse the tree. Instead: recognize situation by looking at current leaf size in lambda below
uint64_t maxBytesPerLeaf = _datatree->maxBytesPerLeaf();
auto _onExistingLeaf = [&onExistingLeaf, beginByte, endByte, endLeaf, writingOutside, maxBytesPerLeaf] (uint32_t leafIndex, DataLeafNode *leaf) {
uint64_t indexOfFirstLeafByte = leafIndex * maxBytesPerLeaf;
uint32_t dataBegin = utils::maxZeroSubtraction(beginByte, indexOfFirstLeafByte);
uint32_t dataEnd = std::min(leaf->maxStoreableBytes(), endByte - indexOfFirstLeafByte);
uint32_t dataEnd = std::min(maxBytesPerLeaf, endByte - indexOfFirstLeafByte);
if (leafIndex == endLeaf-1 && writingOutside) {
// If we are traversing an area that didn't exist before, then the last leaf was just created with a wrong size. We have to fix it.
leaf->resize(dataEnd);
}
func(indexOfFirstLeafByte, leaf, dataBegin, dataEnd-dataBegin);
});
onExistingLeaf(indexOfFirstLeafByte, leaf, dataBegin, dataEnd-dataBegin);
};
auto _onCreateLeaf = [&onCreateLeaf, maxBytesPerLeaf, endByte] (uint32_t leafIndex) -> Data {
uint64_t indexOfFirstLeafByte = leafIndex * maxBytesPerLeaf;
uint32_t dataEnd = std::min(maxBytesPerLeaf, endByte - indexOfFirstLeafByte);
auto data = onCreateLeaf(indexOfFirstLeafByte, dataEnd);
ASSERT(data.size() == dataEnd, "Returned leaf data with wrong size");
return data;
};
_datatree->traverseLeaves(firstLeaf, endLeaf, _onExistingLeaf, _onCreateLeaf);
if (writingOutside) {
ASSERT(_datatree->numStoredBytes() == endByte, "Writing didn't grow by the correct number of bytes");
_sizeCache = endByte;
@ -80,17 +89,27 @@ uint64_t BlobOnBlocks::tryRead(void *target, uint64_t offset, uint64_t count) co
}
void BlobOnBlocks::_read(void *target, uint64_t offset, uint64_t count) const {
traverseLeaves(offset, count, [target, offset] (uint64_t indexOfFirstLeafByte, const DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
auto onExistingLeaf = [target, offset] (uint64_t indexOfFirstLeafByte, const DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
//TODO Simplify formula, make it easier to understand
leaf->read((uint8_t*)target + indexOfFirstLeafByte - offset + leafDataOffset, leafDataOffset, leafDataSize);
});
};
auto onCreateLeaf = [] (uint64_t /*indexOfFirstLeafByte*/, uint32_t /*count*/) -> Data {
ASSERT(false, "Reading shouldn't create new leaves.");
};
traverseLeaves(offset, count, onExistingLeaf, onCreateLeaf);
}
void BlobOnBlocks::write(const void *source, uint64_t offset, uint64_t count) {
traverseLeaves(offset, count, [source, offset] (uint64_t indexOfFirstLeafByte, DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
auto onExistingLeaf = [source, offset] (uint64_t indexOfFirstLeafByte, DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
//TODO Simplify formula, make it easier to understand
leaf->write((uint8_t*)source + indexOfFirstLeafByte - offset + leafDataOffset, leafDataOffset, leafDataSize);
});
};
auto onCreateLeaf = [source, offset] (uint64_t indexOfFirstLeafByte, uint32_t count) -> Data {
Data result(count);
std::memcpy(result.data(), (uint8_t*)source + indexOfFirstLeafByte - offset, count);
return result;
};
traverseLeaves(offset, count, onExistingLeaf, onCreateLeaf);
}
void BlobOnBlocks::flush() {

View File

@ -38,7 +38,7 @@ public:
private:
void _read(void *target, uint64_t offset, uint64_t count) const;
void traverseLeaves(uint64_t offsetBytes, uint64_t sizeBytes, std::function<void (uint64_t, datanodestore::DataLeafNode *, uint32_t, uint32_t)>) const;
void traverseLeaves(uint64_t offsetBytes, uint64_t sizeBytes, std::function<void (uint64_t leafOffset, datanodestore::DataLeafNode *leaf, uint32_t begin, uint32_t count)> onExistingLeaf, std::function<cpputils::Data (uint64_t leafOffset, uint32_t count)> onCreateLeaf) const;
cpputils::unique_ref<parallelaccessdatatreestore::DataTreeRef> _datatree;
mutable boost::optional<uint64_t> _sizeCache;

View File

@ -11,6 +11,7 @@
#include <cpp-utils/pointer/optional_ownership_ptr.h>
#include <cmath>
#include <cpp-utils/assert/assert.h>
#include "impl/LeafTraverser.h"
using blockstore::Key;
using blobstore::onblocks::datanodestore::DataNodeStore;
@ -172,22 +173,27 @@ uint32_t DataTree::_computeNumLeaves(const DataNode &node) const {
void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<cpputils::Data (uint32_t index)> onCreateLeaf) {
//TODO Can we traverse in parallel?
boost::upgrade_lock<shared_mutex> lock(_mutex); //TODO Rethink locking here. We probably need locking when the traverse resizes the blob. Otherwise, parallel traverse should be possible. We already allow it below by freeing the upgrade_lock, but we currently only allow it if ALL traverses are entirely inside the valid region. Can we allow more parallelity?
auto exclusiveLock = std::make_unique<boost::upgrade_to_unique_lock<shared_mutex>>(lock);
std::unique_lock<shared_mutex> lock(_mutex); //TODO Rethink locking here. We probably need locking when the traverse resizes the blob. Otherwise, parallel traverse should be possible. We already allow it below by freeing the upgrade_lock, but we currently only allow it if ALL traverses are entirely inside the valid region. Can we allow more parallelity?
ASSERT(beginIndex <= endIndex, "Invalid parameters");
if (0 == endIndex) {
// In this case the utils::ceilLog(_, endIndex) below would fail
return;
}
//TODO Alternative: Increase depth when necessary at the end of _traverseExistingSubtree, when index goes on after last possible one.
uint8_t neededTreeDepth = utils::ceilLog(_nodeStore->layout().maxChildrenPerInnerNode(), (uint64_t)endIndex);
uint32_t numLeaves = this->_numLeaves(); // TODO Querying the size could cause a tree traversal down to the leaves. Possible without querying the size? If yes, we probably don't need _numLeavesCache anymore, because its only meaning is to keep the value when a lot of parallel write()/read() syscalls happen.
if (_rootNode->depth() < neededTreeDepth) {
//TODO Test cases that actually increase it here by 0 level / 1 level / more than 1 level
increaseTreeDepth(neededTreeDepth - _rootNode->depth());
}
if (numLeaves <= beginIndex) {
LeafTraverser(_nodeStore).traverse(_rootNode.get(), beginIndex, endIndex, onExistingLeaf, onCreateLeaf);
if (_numLeavesCache != none && *_numLeavesCache < endIndex) {
_numLeavesCache = endIndex;
}
/*if (numLeaves <= beginIndex) {
//TODO Test cases with numLeaves < / >= beginIndex
// There is a gap between the current size and the begin of the traversal
auto _onExistingLeaf = [numLeaves, &onExistingLeaf, this](uint32_t index, DataLeafNode* node) {
@ -224,66 +230,7 @@ void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, std::funct
//We are traversing entirely inside the valid region
exclusiveLock.reset(); // we can allow parallel traverses, if all are entirely inside the valid region.
_traverseLeaves(_rootNode.get(), 0, beginIndex, endIndex, onExistingLeaf, onCreateLeaf);
}
}
void DataTree::_traverseLeaves(DataNode *root, uint32_t leafOffset, uint32_t beginIndex, uint32_t endIndex, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<cpputils::Data (uint32_t index)> onCreateLeaf) {
DataLeafNode *leaf = dynamic_cast<DataLeafNode*>(root);
if (leaf != nullptr) {
ASSERT(beginIndex <= 1 && endIndex <= 1, "If root node is a leaf, the (sub)tree has only one leaf - access indices must be 0 or 1.");
if (beginIndex == 0 && endIndex == 1) {
onExistingLeaf(leafOffset, leaf);
}
return;
}
DataInnerNode *inner = dynamic_cast<DataInnerNode*>(root);
uint32_t leavesPerChild = leavesPerFullChild(*inner);
uint32_t beginChild = beginIndex/leavesPerChild;
uint32_t endChild = utils::ceilDivision(endIndex, leavesPerChild);
for (uint32_t childIndex = beginChild; childIndex < std::min(inner->numChildren(), endChild); ++childIndex) {
auto child = _nodeStore->load(inner->getChild(childIndex)->key());
ASSERT(child != none, "Couldn't load child node");
uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localBeginIndex = utils::maxZeroSubtraction(beginIndex, childOffset);
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
_traverseLeaves(child->get(), leafOffset + childOffset, localBeginIndex, localEndIndex, onExistingLeaf, onCreateLeaf);
}
for (uint32_t childIndex = inner->numChildren(); childIndex < endChild; ++childIndex) {
uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
auto child = _createSubtree(leafOffset, localEndIndex, onCreateLeaf);
inner->addChild(child.key());
}
}
unique_ref<DataNode> DataTree::_createSubtree(uint32_t leafOffset, uint32_t numLeaves, std::function<cpputils::Data (uint32_t index)> onCreateLeaf) {
if (numLeaves == 1) {
auto data = onCreateLeaf(leafOffset);
ASSERT(data.size() <= _nodeStore->layout().maxBytesPerLeaf(), "Too much data for a leaf");
//TODO More efficient by _nodeStore->createNewLeafNode(data);
auto leaf = _nodeStore->createNewLeafNode();
leaf->resize(data.size());
leaf->write(data.data(), 0, data.size());
return leaf;
} else {
uint32_t numLeafGroups = utils::ceilDivision(numLeaves, _nodeStore->layout().maxChildrenPerInnerNode());
vector<unique_ref<DataNode>> children;
children.reserve(numLeafGroups);
for (uint32_t i = 0; i < numLeafGroups; ++i) {
children.push_back(_createSubtree())
...
}
}
}
unique_ref<DataNode> DataTree::addChildTo(DataInnerNode *node) {
auto new_leaf = _nodeStore->createNewLeafNode();
new_leaf->resize(_nodeStore->layout().maxBytesPerLeaf());
auto chain = createChainOfInnerNodes(node->depth()-1, std::move(new_leaf));
node->addChild(*chain);
return std::move(chain);
}*/
}
uint32_t DataTree::leavesPerFullChild(const DataInnerNode &root) const {

View File

@ -62,7 +62,6 @@ private:
void ifRootHasOnlyOneChildReplaceRootWithItsChild();
//TODO Use underscore for private methods
void _traverseLeaves(datanodestore::DataNode *root, uint32_t leafOffset, uint32_t beginIndex, uint32_t endIndex, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<cpputils::Data (uint32_t index)> onCreateLeaf);
uint32_t leavesPerFullChild(const datanodestore::DataInnerNode &root) const;
uint64_t _numStoredBytes() const;
uint64_t _numStoredBytes(const datanodestore::DataNode &root) const;
@ -71,8 +70,6 @@ private:
cpputils::optional_ownership_ptr<datanodestore::DataLeafNode> LastLeaf(datanodestore::DataNode *root);
cpputils::unique_ref<datanodestore::DataLeafNode> LastLeaf(cpputils::unique_ref<datanodestore::DataNode> root);
datanodestore::DataInnerNode* increaseTreeDepth(unsigned int levels);
cpputils::unique_ref<datanodestore::DataNode> _createSubtree(uint32_t leafOffset, uint32_t numLeaves, std::function<cpputils::Data (uint32_t index)> onCreateLeaf);
cpputils::unique_ref<datanodestore::DataNode> addChildTo(datanodestore::DataInnerNode *node);
DISALLOW_COPY_AND_ASSIGN(DataTree);
};

View File

@ -0,0 +1,147 @@
#include "LeafTraverser.h"
#include <cpp-utils/assert/assert.h>
#include "../../datanodestore/DataLeafNode.h"
#include "../../datanodestore/DataInnerNode.h"
#include "../../datanodestore/DataNodeStore.h"
#include "../../utils/Math.h"
using std::function;
using std::vector;
using boost::none;
using cpputils::Data;
using cpputils::unique_ref;
using blobstore::onblocks::datanodestore::DataNodeStore;
using blobstore::onblocks::datanodestore::DataNode;
using blobstore::onblocks::datanodestore::DataInnerNode;
using blobstore::onblocks::datanodestore::DataLeafNode;
namespace blobstore {
namespace onblocks {
namespace datatreestore {
LeafTraverser::LeafTraverser(DataNodeStore *nodeStore)
: _nodeStore(nodeStore) {
}
void LeafTraverser::traverse(DataNode *root, uint32_t beginIndex, uint32_t endIndex, function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf, function<Data (uint32_t index)> onCreateLeaf) {
ASSERT(beginIndex <= endIndex, "Invalid parameters");
_traverseExistingSubtree(root, beginIndex, endIndex, 0, false, onExistingLeaf, onCreateLeaf);
}
void LeafTraverser::_traverseExistingSubtree(DataNode *root, uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, bool growLastExistingLeaf, function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf, function<Data (uint32_t index)> onCreateLeaf) {
ASSERT(beginIndex <= endIndex, "Invalid parameters");
//TODO Test cases with numLeaves < / >= beginIndex
DataLeafNode *leaf = dynamic_cast<DataLeafNode*>(root);
if (leaf != nullptr) {
ASSERT(beginIndex <= 1 && endIndex <= 1, "If root node is a leaf, the (sub)tree has only one leaf - access indices must be 0 or 1.");
if (beginIndex == 0 && endIndex == 1) {
if (growLastExistingLeaf) {
leaf->resize(_nodeStore->layout().maxBytesPerLeaf());
}
onExistingLeaf(leafOffset, leaf);
}
return;
}
DataInnerNode *inner = dynamic_cast<DataInnerNode*>(root);
uint32_t leavesPerChild = _maxLeavesForTreeDepth(inner->depth()-1);
uint32_t beginChild = beginIndex/leavesPerChild;
uint32_t endChild = utils::ceilDivision(endIndex, leavesPerChild);
uint32_t numChildren = inner->numChildren();
bool shouldGrowLastExistingLeaf = growLastExistingLeaf || endChild > numChildren;
// If we traverse outside of the valid region, we still have to descend into the valid region to grow the last leaf
if (beginChild >= numChildren && shouldGrowLastExistingLeaf) {
ASSERT(beginChild > 0, "This can only happen for numChildren==0.");
auto childKey = inner->getChild(beginChild-1)->key();
auto childNode = _nodeStore->load(childKey);
if (childNode == none) {
throw std::runtime_error("Couldn't find child node "+childKey.ToString());
}
_traverseExistingSubtree(childNode->get(), leavesPerChild-1, leavesPerChild, leafOffset - 1, true,
[] (uint32_t /*index*/, DataLeafNode* /*leaf*/) {},
[] (uint32_t /*index*/) -> Data {ASSERT(false, "We only want to grow the last leaf. We shouldn't create leaves.");});
}
// Traverse existing children
for (uint32_t childIndex = beginChild; childIndex < std::min(endChild, numChildren); ++childIndex) {
auto childKey = inner->getChild(childIndex)->key();
auto childNode = _nodeStore->load(childKey);
if (childNode == none) {
throw std::runtime_error("Couldn't find child node "+childKey.ToString());
}
uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localBeginIndex = utils::maxZeroSubtraction(beginIndex, childOffset);
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
bool isLastChild = (childIndex == numChildren - 1);
_traverseExistingSubtree(childNode->get(), localBeginIndex, localEndIndex, leafOffset + childOffset, shouldGrowLastExistingLeaf && isLastChild, onExistingLeaf, onCreateLeaf);
}
// Traverse gap children (children after currently last leaf that are not traversed, i.e. before the first traversed leaf)
for (uint32_t childIndex = numChildren; childIndex < endChild; ++childIndex) {
uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
ASSERT(beginIndex <= childOffset, "Range for creating new children has to contain their first leaf.");
uint64_t maxBytesPerLeaf = _nodeStore->layout().maxBytesPerLeaf();
auto child = _createNewSubtree(localEndIndex, leafOffset + childOffset, inner->depth()-1, [maxBytesPerLeaf] (uint32_t /*index*/) {
return Data(maxBytesPerLeaf).FillWithZeroes();
});
inner->addChild(*child);
}
// Traverse new children (children after currently last leaf that are traversed)
for (uint32_t childIndex = std::max(beginChild, numChildren); childIndex < endChild; ++childIndex) {
uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
ASSERT(beginIndex <= childOffset, "Range for creating new children has to contain their first leaf.");
auto child = _createNewSubtree(localEndIndex, leafOffset + childOffset, inner->depth()-1, onCreateLeaf);
inner->addChild(*child);
}
}
unique_ref<DataNode> LeafTraverser::_createNewSubtree(uint32_t numLeaves, uint32_t leafOffset, uint8_t depth, function<Data (uint32_t index)> onCreateLeaf) {
ASSERT(depth > 0, "Wrong depth given");
if (1 == depth) {
ASSERT(numLeaves == 1, "With depth 1, we can only create one leaf.");
auto data = onCreateLeaf(leafOffset);
// TODO Performance: Directly create with data.
auto node = _nodeStore->createNewLeafNode();
node->resize(data.size());
node->write(data.data(), 0, data.size());
return node;
}
uint8_t minNeededDepth = utils::ceilLog(_nodeStore->layout().maxChildrenPerInnerNode(), (uint64_t)numLeaves);
ASSERT(depth >= minNeededDepth, "Given tree depth doesn't fit given number of leaves to create.");
uint32_t leavesPerChild = _maxLeavesForTreeDepth(depth-1);
uint32_t endChild = utils::ceilDivision(numLeaves, leavesPerChild);
vector<unique_ref<DataNode>> children;
children.reserve(endChild);
for(uint32_t childIndex = 0; childIndex < endChild; ++childIndex) {
uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localNumLeaves = std::min(leavesPerChild, numLeaves - childOffset);
auto child = _createNewSubtree(localNumLeaves, leafOffset + childOffset, depth - 1, onCreateLeaf);
children.push_back(std::move(child));
}
ASSERT(children.size() > 0, "No children created");
//TODO Performance: Directly create inner node with all children
auto newNode = _nodeStore->createNewInnerNode(*children[0]);
for (auto childIter = children.begin()+1; childIter != children.end(); ++childIter) {
newNode->addChild(**childIter);
}
return newNode;
}
uint32_t LeafTraverser::_maxLeavesForTreeDepth(uint8_t depth) {
return utils::intPow(_nodeStore->layout().maxChildrenPerInnerNode(), (uint64_t)depth);
}
}
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#ifndef MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_IMPL_LEAFTRAVERSER_H_
#define MESSMER_BLOBSTORE_IMPLEMENTATIONS_ONBLOCKS_IMPL_LEAFTRAVERSER_H_
#include <cpp-utils/macros.h>
#include <cpp-utils/pointer/unique_ref.h>
#include <cpp-utils/data/Data.h>
namespace blobstore {
namespace onblocks {
namespace datanodestore {
class DataNodeStore;
class DataNode;
class DataLeafNode;
class DataInnerNode;
}
namespace datatreestore {
/**
* LeafTraverser can create leaves if they don't exist yet (i.e. endIndex > numLeaves), but
* it cannot increase the tree depth. That is, the tree has to be deep enough to allow
* creating the number of leaves.
*/
class LeafTraverser final {
public:
LeafTraverser(datanodestore::DataNodeStore *nodeStore);
void traverse(datanodestore::DataNode *root, uint32_t beginIndex, uint32_t endIndex, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<cpputils::Data (uint32_t index)> onCreateLeaf);
private:
datanodestore::DataNodeStore *_nodeStore;
void _traverseExistingSubtree(datanodestore::DataNode *root, uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, bool growLastExistingLeaf, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<cpputils::Data (uint32_t index)> onCreateLeaf);
cpputils::unique_ref<datanodestore::DataNode> _createNewSubtree(uint32_t numLeaves, uint32_t leafOffset, uint8_t depth, std::function<cpputils::Data (uint32_t index)> onCreateLeaf);
uint32_t _maxLeavesForTreeDepth(uint8_t depth);
DISALLOW_COPY_AND_ASSIGN(LeafTraverser);
};
}
}
}
#endif

View File

@ -21,8 +21,8 @@ public:
return _baseTree->maxBytesPerLeaf();
}
void traverseLeaves(uint32_t beginIndex, uint32_t endIndex, std::function<void (datanodestore::DataLeafNode*, uint32_t)> func) {
return _baseTree->traverseLeaves(beginIndex, endIndex, func);
void traverseLeaves(uint32_t beginIndex, uint32_t endIndex, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<cpputils::Data (uint32_t index)> onCreateLeaf) {
return _baseTree->traverseLeaves(beginIndex, endIndex, onExistingLeaf, onCreateLeaf);
}
uint32_t numLeaves() const {

View File

@ -111,7 +111,7 @@ public:
uint32_t oldNumLeaves = tree->numLeaves();
uint32_t newNumLeaves = oldNumLeaves + numLeavesToAdd;
//TODO Test cases where beginIndex is inside of the existing leaves
tree->traverseLeaves(newNumLeaves-1, newNumLeaves, [] (DataLeafNode*,uint32_t){});
tree->traverseLeaves(newNumLeaves-1, newNumLeaves, [] (uint32_t, DataLeafNode*){}, [] (uint32_t count) -> Data { return Data(count).FillWithZeroes();});
tree->flush();
}

View File

@ -10,10 +10,13 @@ using blobstore::onblocks::datatreestore::DataTree;
using blockstore::Key;
using cpputils::unique_ref;
using cpputils::Data;
using std::shared_ptr;
class TraversorMock {
public:
MOCK_METHOD2(called, void(DataLeafNode*, uint32_t));
MOCK_METHOD2(calledExistingLeaf, void(DataLeafNode*, uint32_t));
MOCK_METHOD1(calledCreateLeaf, shared_ptr<Data>(uint32_t));
};
MATCHER_P(KeyEq, expected, "node key equals") {
@ -43,7 +46,7 @@ public:
}
void EXPECT_TRAVERSE_LEAF(const Key &key, uint32_t leafIndex) {
EXPECT_CALL(traversor, called(KeyEq(key), leafIndex)).Times(1);
EXPECT_CALL(traversor, calledExistingLeaf(KeyEq(key), leafIndex)).Times(1);
}
void EXPECT_TRAVERSE_ALL_CHILDREN_OF(const DataInnerNode &node, uint32_t firstLeafIndex) {
@ -53,14 +56,17 @@ public:
}
void EXPECT_DONT_TRAVERSE_ANY_LEAVES() {
EXPECT_CALL(traversor, called(_, _)).Times(0);
EXPECT_CALL(traversor, calledExistingLeaf(_, _)).Times(0);
EXPECT_CALL(traversor, calledCreateLeaf(_)).Times(0);
}
void TraverseLeaves(DataNode *root, uint32_t beginIndex, uint32_t endIndex) {
root->flush();
auto tree = treeStore.load(root->key()).value();
tree->traverseLeaves(beginIndex, endIndex, [this] (DataLeafNode *leaf, uint32_t nodeIndex) {
traversor.called(leaf, nodeIndex);
tree->traverseLeaves(beginIndex, endIndex, [this] (uint32_t nodeIndex, DataLeafNode *leaf) {
traversor.calledExistingLeaf(leaf, nodeIndex);
}, [this] (uint32_t nodeIndex) -> Data {
return traversor.calledCreateLeaf(nodeIndex)->copy();
});
}