Use LeafTraversor for resizing blobs

This commit is contained in:
Sebastian Messmer 2016-07-12 20:36:12 +02:00
parent a19d79361c
commit 845b0b5239
8 changed files with 86 additions and 145 deletions

View File

@ -88,7 +88,8 @@ Data BlobOnBlocks::readAll() const {
} }
void BlobOnBlocks::read(void *target, uint64_t offset, uint64_t count) const { void BlobOnBlocks::read(void *target, uint64_t offset, uint64_t count) const {
ASSERT(offset <= size() && offset + count <= size(), "BlobOnBlocks::read() read outside blob. Use BlobOnBlocks::tryRead() if this should be allowed."); uint64_t _size = size();
ASSERT(offset <= _size && offset + count <= _size, "BlobOnBlocks::read() read outside blob. Use BlobOnBlocks::tryRead() if this should be allowed.");
uint64_t read = tryRead(target, offset, count); uint64_t read = tryRead(target, offset, count);
ASSERT(read == count, "BlobOnBlocks::read() couldn't read all requested bytes. Use BlobOnBlocks::tryRead() if this should be allowed."); ASSERT(read == count, "BlobOnBlocks::read() couldn't read all requested bytes. Use BlobOnBlocks::tryRead() if this should be allowed.");
} }

View File

@ -77,6 +77,16 @@ void DataInnerNode::addChild(const DataNode &child) {
LastChild()->setKey(child.key()); LastChild()->setKey(child.key());
} }
void DataInnerNode::reduceNumChildren(uint32_t newNumChildren) {
ASSERT(node().Size() >= newNumChildren, "New num children given to reduceNumChildren() is larger than old num children.");
if (node().Size() != newNumChildren) {
for (auto entry = ChildrenBegin() + newNumChildren; entry != ChildrenEnd(); ++entry) {
entry->setKey(Key::Null());
}
node().setSize(newNumChildren);
}
}
void DataInnerNode::removeLastChild() { void DataInnerNode::removeLastChild() {
ASSERT(node().Size() > 1, "There is no child to remove"); ASSERT(node().Size() > 1, "There is no child to remove");
node().setSize(node().Size()-1); node().setSize(node().Size()-1);

View File

@ -26,6 +26,7 @@ public:
uint32_t numChildren() const; uint32_t numChildren() const;
void addChild(const DataNode &child_key); void addChild(const DataNode &child_key);
void reduceNumChildren(uint32_t newNumChildren);
void removeLastChild(); void removeLastChild();

View File

@ -46,83 +46,17 @@ DataTree::DataTree(DataNodeStore *nodeStore, unique_ref<DataNode> rootNode)
DataTree::~DataTree() { DataTree::~DataTree() {
} }
void DataTree::removeLastDataLeaf() { void DataTree::whileRootHasOnlyOneChildReplaceRootWithItsChild() {
auto deletePosOrNull = algorithms::GetLowestRightBorderNodeWithMoreThanOneChildOrNull(_nodeStore, _rootNode.get());
ASSERT(deletePosOrNull.get() != nullptr, "Tree has only one leaf, can't shrink it.");
deleteLastChildSubtree(deletePosOrNull.get());
ifRootHasOnlyOneChildReplaceRootWithItsChild();
}
void DataTree::ifRootHasOnlyOneChildReplaceRootWithItsChild() {
DataInnerNode *rootNode = dynamic_cast<DataInnerNode*>(_rootNode.get()); DataInnerNode *rootNode = dynamic_cast<DataInnerNode*>(_rootNode.get());
ASSERT(rootNode != nullptr, "RootNode is not an inner node"); if (rootNode != nullptr && rootNode->numChildren() == 1) {
if (rootNode->numChildren() == 1) {
auto child = _nodeStore->load(rootNode->getChild(0)->key()); auto child = _nodeStore->load(rootNode->getChild(0)->key());
ASSERT(child != none, "Couldn't load first child of root node"); ASSERT(child != none, "Couldn't load first child of root node");
_rootNode = _nodeStore->overwriteNodeWith(std::move(_rootNode), **child); _rootNode = _nodeStore->overwriteNodeWith(std::move(_rootNode), **child);
_nodeStore->remove(std::move(*child)); _nodeStore->remove(std::move(*child));
whileRootHasOnlyOneChildReplaceRootWithItsChild();
} }
} }
void DataTree::deleteLastChildSubtree(DataInnerNode *node) {
auto lastChild = _nodeStore->load(node->LastChild()->key());
ASSERT(lastChild != none, "Couldn't load last child");
_nodeStore->removeSubtree(std::move(*lastChild));
node->removeLastChild();
}
unique_ref<DataLeafNode> DataTree::addDataLeaf() {
auto insertPosOrNull = algorithms::GetLowestInnerRightBorderNodeWithLessThanKChildrenOrNull(_nodeStore, _rootNode.get());
if (insertPosOrNull) {
return addDataLeafAt(insertPosOrNull.get());
} else {
return addDataLeafToFullTree();
}
}
unique_ref<DataLeafNode> DataTree::addDataLeafAt(DataInnerNode *insertPos) {
auto new_leaf = _nodeStore->createNewLeafNode();
auto chain = createChainOfInnerNodes(insertPos->depth()-1, new_leaf.get());
insertPos->addChild(*chain);
return new_leaf;
}
optional_ownership_ptr<DataNode> DataTree::createChainOfInnerNodes(unsigned int num, DataNode *child) {
//TODO This function is implemented twice, once with optional_ownership_ptr, once with unique_ref. Redundancy!
optional_ownership_ptr<DataNode> chain = cpputils::WithoutOwnership<DataNode>(child);
for(unsigned int i=0; i<num; ++i) {
auto newnode = _nodeStore->createNewInnerNode(*chain);
chain = cpputils::WithOwnership<DataNode>(std::move(newnode));
}
return chain;
}
unique_ref<DataNode> DataTree::createChainOfInnerNodes(unsigned int num, unique_ref<DataNode> child) {
unique_ref<DataNode> chain = std::move(child);
for(unsigned int i=0; i<num; ++i) {
chain = _nodeStore->createNewInnerNode(*chain);
}
return chain;
}
DataInnerNode* DataTree::increaseTreeDepth(unsigned int levels) {
ASSERT(levels >= 1, "Parameter out of bounds: tried to increase tree depth by zero.");
auto copyOfOldRoot = _nodeStore->createNewNodeAsCopyFrom(*_rootNode);
auto chain = createChainOfInnerNodes(levels-1, copyOfOldRoot.get());
auto newRootNode = DataNode::convertToNewInnerNode(std::move(_rootNode), *chain);
DataInnerNode *result = newRootNode.get();
_rootNode = std::move(newRootNode);
return result;
}
unique_ref<DataLeafNode> DataTree::addDataLeafToFullTree() {
DataInnerNode *rootNode = increaseTreeDepth(1);
auto newLeaf = addDataLeafAt(rootNode);
return newLeaf;
}
const Key &DataTree::key() const { const Key &DataTree::key() const {
return _rootNode->key(); return _rootNode->key();
} }
@ -171,18 +105,27 @@ uint32_t DataTree::_computeNumLeaves(const DataNode &node) const {
return numLeavesInLeftChildren + numLeavesInRightChild; return numLeavesInLeftChildren + numLeavesInRightChild;
} }
void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<cpputils::Data (uint32_t index)> onCreateLeaf) { void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf, function<Data (uint32_t index)> onCreateLeaf) {
//TODO Can we allow multiple runs of traverseLeaves() in parallel? //TODO Can we allow multiple runs of traverseLeaves() in parallel? Also in parallel with resizeNumBytes()?
std::unique_lock<shared_mutex> lock(_mutex); std::unique_lock<shared_mutex> lock(_mutex);
ASSERT(beginIndex <= endIndex, "Invalid parameters"); ASSERT(beginIndex <= endIndex, "Invalid parameters");
_rootNode = LeafTraverser(_nodeStore).traverseAndReturnRoot(std::move(_rootNode), beginIndex, endIndex, onExistingLeaf, onCreateLeaf); auto onBacktrackFromSubtree = [] (DataInnerNode* /*node*/) {};
_traverseLeaves(beginIndex, endIndex, onExistingLeaf, onCreateLeaf, onBacktrackFromSubtree);
if (_numLeavesCache != none && *_numLeavesCache < endIndex) { if (_numLeavesCache != none && *_numLeavesCache < endIndex) {
_numLeavesCache = endIndex; _numLeavesCache = endIndex;
} }
} }
void DataTree::_traverseLeaves(uint32_t beginIndex, uint32_t endIndex,
function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf,
function<Data (uint32_t index)> onCreateLeaf,
function<void (DataInnerNode *node)> onBacktrackFromSubtree) {
_rootNode = LeafTraverser(_nodeStore).traverseAndReturnRoot(std::move(_rootNode), beginIndex, endIndex, onExistingLeaf, onCreateLeaf, onBacktrackFromSubtree);
}
uint32_t DataTree::leavesPerFullChild(const DataInnerNode &root) const { uint32_t DataTree::leavesPerFullChild(const DataInnerNode &root) const {
return utils::intPow(_nodeStore->layout().maxChildrenPerInnerNode(), (uint64_t)root.depth()-1); return utils::intPow(_nodeStore->layout().maxChildrenPerInnerNode(), (uint64_t)root.depth()-1);
} }
@ -212,56 +155,35 @@ uint64_t DataTree::_numStoredBytes(const DataNode &root) const {
} }
void DataTree::resizeNumBytes(uint64_t newNumBytes) { void DataTree::resizeNumBytes(uint64_t newNumBytes) {
//TODO Use LeafTraverser here. For growing, that should be trivial. Maybe there is even a way to make it for shrinking? Maybe a callback for inner nodes? std::unique_lock<shared_mutex> lock(_mutex); // TODO Multiple ones in parallel? Also in parallel with traverseLeaves()?
//TODO Can we resize in parallel? Especially creating new blocks (i.e. encrypting them) is expensive and should be done in parallel.
boost::upgrade_lock<shared_mutex> lock(_mutex);
{
boost::upgrade_to_unique_lock<shared_mutex> exclusiveLock(lock);
//TODO Faster implementation possible (no addDataLeaf()/removeLastDataLeaf() in a loop, but directly resizing)
LastLeaf(_rootNode.get())->resize(_nodeStore->layout().maxBytesPerLeaf());
uint64_t currentNumBytes = _numStoredBytes();
ASSERT(currentNumBytes % _nodeStore->layout().maxBytesPerLeaf() == 0, "The last leaf is not a max data leaf, although we just resized it to be one.");
uint32_t currentNumLeaves = currentNumBytes / _nodeStore->layout().maxBytesPerLeaf();
uint32_t newNumLeaves = std::max(UINT64_C(1), utils::ceilDivision(newNumBytes, _nodeStore->layout().maxBytesPerLeaf()));
for(uint32_t i = currentNumLeaves; i < newNumLeaves; ++i) { uint32_t newNumLeaves = std::max(UINT64_C(1), utils::ceilDivision(newNumBytes, _nodeStore->layout().maxBytesPerLeaf()));
addDataLeaf()->resize(_nodeStore->layout().maxBytesPerLeaf()); uint32_t newLastLeafSize = newNumBytes - (newNumLeaves-1) * _nodeStore->layout().maxBytesPerLeaf();
} uint32_t maxChildrenPerInnerNode = _nodeStore->layout().maxChildrenPerInnerNode();
for(uint32_t i = currentNumLeaves; i > newNumLeaves; --i) { auto onExistingLeaf = [newLastLeafSize] (uint32_t /*index*/, datanodestore::DataLeafNode* leaf) {
removeLastDataLeaf(); // This is only called, if the new last leaf was already existing
} leaf->resize(newLastLeafSize);
uint32_t newLastLeafSize = newNumBytes - (newNumLeaves-1)*_nodeStore->layout().maxBytesPerLeaf(); };
LastLeaf(_rootNode.get())->resize(newLastLeafSize); auto onCreateLeaf = [newLastLeafSize] (uint32_t /*index*/) -> Data {
// This is only called, if the new last leaf was not existing yet
return Data(newLastLeafSize).FillWithZeroes();
};
auto onBacktrackFromSubtree = [newNumLeaves, maxChildrenPerInnerNode] (DataInnerNode* node) {
// This is only called for the right border nodes of the new tree.
// When growing size, the following is a no-op. When shrinking, we're deleting the children that aren't needed anymore.
uint32_t maxLeavesPerChild = utils::intPow((uint64_t)maxChildrenPerInnerNode, ((uint64_t)node->depth()-1));
uint32_t neededNodesOnChildLevel = utils::ceilDivision(newNumLeaves, maxLeavesPerChild);
uint32_t neededSiblings = utils::ceilDivision(neededNodesOnChildLevel, maxChildrenPerInnerNode);
uint32_t neededChildrenForRightBorderNode = neededNodesOnChildLevel - (neededSiblings-1) * maxChildrenPerInnerNode;
node->reduceNumChildren(neededChildrenForRightBorderNode);
};
_numLeavesCache = newNumLeaves; _traverseLeaves(newNumLeaves - 1, newNumLeaves, onExistingLeaf, onCreateLeaf, onBacktrackFromSubtree);
} whileRootHasOnlyOneChildReplaceRootWithItsChild();
_numLeavesCache = newNumLeaves;
ASSERT(newNumBytes == _numStoredBytes(), "We resized to the wrong number of bytes ("+std::to_string(numStoredBytes())+" instead of "+std::to_string(newNumBytes)+")"); ASSERT(newNumBytes == _numStoredBytes(), "We resized to the wrong number of bytes ("+std::to_string(numStoredBytes())+" instead of "+std::to_string(newNumBytes)+")");
} }
optional_ownership_ptr<DataLeafNode> DataTree::LastLeaf(DataNode *root) {
DataLeafNode *leaf = dynamic_cast<DataLeafNode*>(root);
if (leaf != nullptr) {
return WithoutOwnership(leaf);
}
DataInnerNode *inner = dynamic_cast<DataInnerNode*>(root);
auto lastChild = _nodeStore->load(inner->LastChild()->key());
ASSERT(lastChild != none, "Couldn't load last child");
return WithOwnership(LastLeaf(std::move(*lastChild)));
}
unique_ref<DataLeafNode> DataTree::LastLeaf(unique_ref<DataNode> root) {
auto leaf = dynamic_pointer_move<DataLeafNode>(root);
if (leaf != none) {
return std::move(*leaf);
}
auto inner = dynamic_pointer_move<DataInnerNode>(root);
ASSERT(inner != none, "Root node is neither a leaf nor an inner node");
auto child = _nodeStore->load((*inner)->LastChild()->key());
ASSERT(child != none, "Couldn't load last child");
return LastLeaf(std::move(*child));
}
uint64_t DataTree::maxBytesPerLeaf() const { uint64_t DataTree::maxBytesPerLeaf() const {
return _nodeStore->layout().maxBytesPerLeaf(); return _nodeStore->layout().maxBytesPerLeaf();
} }

View File

@ -47,29 +47,21 @@ private:
cpputils::unique_ref<datanodestore::DataNode> _rootNode; cpputils::unique_ref<datanodestore::DataNode> _rootNode;
mutable boost::optional<uint32_t> _numLeavesCache; mutable boost::optional<uint32_t> _numLeavesCache;
cpputils::unique_ref<datanodestore::DataLeafNode> addDataLeaf();
void removeLastDataLeaf();
cpputils::unique_ref<datanodestore::DataNode> releaseRootNode(); cpputils::unique_ref<datanodestore::DataNode> releaseRootNode();
friend class DataTreeStore; friend class DataTreeStore;
cpputils::unique_ref<datanodestore::DataLeafNode> addDataLeafAt(datanodestore::DataInnerNode *insertPos); void whileRootHasOnlyOneChildReplaceRootWithItsChild();
cpputils::optional_ownership_ptr<datanodestore::DataNode> createChainOfInnerNodes(unsigned int num, datanodestore::DataNode *child);
cpputils::unique_ref<datanodestore::DataNode> createChainOfInnerNodes(unsigned int num, cpputils::unique_ref<datanodestore::DataNode> child);
cpputils::unique_ref<datanodestore::DataLeafNode> addDataLeafToFullTree();
void deleteLastChildSubtree(datanodestore::DataInnerNode *node);
void ifRootHasOnlyOneChildReplaceRootWithItsChild();
//TODO Use underscore for private methods //TODO Use underscore for private methods
void _traverseLeaves(uint32_t beginIndex, uint32_t endIndex,
std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf,
std::function<cpputils::Data (uint32_t index)> onCreateLeaf,
std::function<void (datanodestore::DataInnerNode *node)> onBacktrackFromSubtree);
uint32_t leavesPerFullChild(const datanodestore::DataInnerNode &root) const; uint32_t leavesPerFullChild(const datanodestore::DataInnerNode &root) const;
uint64_t _numStoredBytes() const; uint64_t _numStoredBytes() const;
uint64_t _numStoredBytes(const datanodestore::DataNode &root) const; uint64_t _numStoredBytes(const datanodestore::DataNode &root) const;
uint32_t _numLeaves() const; uint32_t _numLeaves() const;
uint32_t _computeNumLeaves(const datanodestore::DataNode &node) const; uint32_t _computeNumLeaves(const datanodestore::DataNode &node) const;
cpputils::optional_ownership_ptr<datanodestore::DataLeafNode> LastLeaf(datanodestore::DataNode *root);
cpputils::unique_ref<datanodestore::DataLeafNode> LastLeaf(cpputils::unique_ref<datanodestore::DataNode> root);
datanodestore::DataInnerNode* increaseTreeDepth(unsigned int levels);
DISALLOW_COPY_AND_ASSIGN(DataTree); DISALLOW_COPY_AND_ASSIGN(DataTree);
}; };

View File

@ -23,7 +23,7 @@ namespace blobstore {
: _nodeStore(nodeStore) { : _nodeStore(nodeStore) {
} }
unique_ref<DataNode> LeafTraverser::traverseAndReturnRoot(unique_ref<DataNode> root, uint32_t beginIndex, uint32_t endIndex, function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf, function<Data (uint32_t index)> onCreateLeaf) { unique_ref<DataNode> LeafTraverser::traverseAndReturnRoot(unique_ref<DataNode> root, uint32_t beginIndex, uint32_t endIndex, function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf, function<Data (uint32_t index)> onCreateLeaf, function<void (DataInnerNode *node)> onBacktrackFromSubtree) {
ASSERT(beginIndex <= endIndex, "Invalid parameters"); ASSERT(beginIndex <= endIndex, "Invalid parameters");
//TODO Test cases with numLeaves < / >= beginIndex, ideally test all configurations: //TODO Test cases with numLeaves < / >= beginIndex, ideally test all configurations:
@ -33,7 +33,7 @@ namespace blobstore {
uint32_t maxLeavesForDepth = _maxLeavesForTreeDepth(root->depth()); uint32_t maxLeavesForDepth = _maxLeavesForTreeDepth(root->depth());
_traverseExistingSubtree(root.get(), std::min(beginIndex, maxLeavesForDepth), std::min(endIndex, maxLeavesForDepth), 0, false, onExistingLeaf, onCreateLeaf); _traverseExistingSubtree(root.get(), std::min(beginIndex, maxLeavesForDepth), std::min(endIndex, maxLeavesForDepth), 0, false, onExistingLeaf, onCreateLeaf, onBacktrackFromSubtree);
// If the traversal goes too far right for a tree this depth, increase tree depth by one and continue traversal. // If the traversal goes too far right for a tree this depth, increase tree depth by one and continue traversal.
// This is recursive, i.e. will be repeated if the tree is still not deep enough. // This is recursive, i.e. will be repeated if the tree is still not deep enough.
@ -42,7 +42,7 @@ namespace blobstore {
if (endIndex > maxLeavesForDepth) { if (endIndex > maxLeavesForDepth) {
// TODO Test cases that increase tree depth by 0, 1, 2, ... levels // TODO Test cases that increase tree depth by 0, 1, 2, ... levels
auto newRoot = _increaseTreeDepth(std::move(root)); auto newRoot = _increaseTreeDepth(std::move(root));
return traverseAndReturnRoot(std::move(newRoot), std::max(beginIndex, maxLeavesForDepth), endIndex, onExistingLeaf, onCreateLeaf); return traverseAndReturnRoot(std::move(newRoot), std::max(beginIndex, maxLeavesForDepth), endIndex, onExistingLeaf, onCreateLeaf, onBacktrackFromSubtree);
} else { } else {
return std::move(root); return std::move(root);
} }
@ -53,7 +53,7 @@ namespace blobstore {
return DataNode::convertToNewInnerNode(std::move(root), *copyOfOldRoot); return DataNode::convertToNewInnerNode(std::move(root), *copyOfOldRoot);
} }
void LeafTraverser::_traverseExistingSubtree(DataNode *root, uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, bool growLastLeaf, function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf, function<Data (uint32_t index)> onCreateLeaf) { void LeafTraverser::_traverseExistingSubtree(DataNode *root, uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, bool growLastLeaf, function<void (uint32_t index, DataLeafNode* leaf)> onExistingLeaf, function<Data (uint32_t index)> onCreateLeaf, function<void (DataInnerNode *node)> onBacktrackFromSubtree) {
ASSERT(beginIndex <= endIndex, "Invalid parameters"); ASSERT(beginIndex <= endIndex, "Invalid parameters");
//TODO Call callbacks for different leaves in parallel. //TODO Call callbacks for different leaves in parallel.
@ -92,7 +92,8 @@ namespace blobstore {
uint32_t negativeChildOffset = (numChildren-1) * leavesPerChild; uint32_t negativeChildOffset = (numChildren-1) * leavesPerChild;
_traverseExistingSubtree(childNode->get(), leavesPerChild-1, leavesPerChild, leafOffset - negativeChildOffset, true, _traverseExistingSubtree(childNode->get(), leavesPerChild-1, leavesPerChild, leafOffset - negativeChildOffset, true,
[] (uint32_t /*index*/, DataLeafNode* /*leaf*/) {}, [] (uint32_t /*index*/, DataLeafNode* /*leaf*/) {},
_createMaxSizeLeaf()); _createMaxSizeLeaf(),
[] (DataInnerNode* /*node*/) {});
} }
// Traverse existing children // Traverse existing children
@ -108,7 +109,7 @@ namespace blobstore {
bool isLastChild = (childIndex == numChildren - 1); bool isLastChild = (childIndex == numChildren - 1);
ASSERT(localEndIndex <= leavesPerChild, "We don't want the child to add a tree level because it doesn't have enough space for the traversal."); ASSERT(localEndIndex <= leavesPerChild, "We don't want the child to add a tree level because it doesn't have enough space for the traversal.");
_traverseExistingSubtree(childNode->get(), localBeginIndex, localEndIndex, leafOffset + childOffset, shouldGrowLastExistingLeaf && isLastChild, _traverseExistingSubtree(childNode->get(), localBeginIndex, localEndIndex, leafOffset + childOffset, shouldGrowLastExistingLeaf && isLastChild,
onExistingLeaf, onCreateLeaf); onExistingLeaf, onCreateLeaf, onBacktrackFromSubtree);
} }
// Traverse new children (including gap children, i.e. children that are created but not traversed because they're to the right of the current size, but to the left of the traversal region) // Traverse new children (including gap children, i.e. children that are created but not traversed because they're to the right of the current size, but to the left of the traversal region)
@ -117,12 +118,17 @@ namespace blobstore {
uint32_t localBeginIndex = std::min(leavesPerChild, utils::maxZeroSubtraction(beginIndex, childOffset)); uint32_t localBeginIndex = std::min(leavesPerChild, utils::maxZeroSubtraction(beginIndex, childOffset));
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset); uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
auto leafCreator = (childIndex >= beginChild) ? onCreateLeaf : _createMaxSizeLeaf(); auto leafCreator = (childIndex >= beginChild) ? onCreateLeaf : _createMaxSizeLeaf();
auto child = _createNewSubtree(localBeginIndex, localEndIndex, leafOffset + childOffset, inner->depth() - 1, leafCreator); auto child = _createNewSubtree(localBeginIndex, localEndIndex, leafOffset + childOffset, inner->depth() - 1, leafCreator, onBacktrackFromSubtree);
inner->addChild(*child); inner->addChild(*child);
} }
// This is only a backtrack, if we actually visited a leaf here.
if (endIndex > beginIndex) {
onBacktrackFromSubtree(inner);
}
} }
unique_ref<DataNode> LeafTraverser::_createNewSubtree(uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, uint8_t depth, function<Data (uint32_t index)> onCreateLeaf) { unique_ref<DataNode> LeafTraverser::_createNewSubtree(uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, uint8_t depth, function<Data (uint32_t index)> onCreateLeaf, function<void (DataInnerNode *node)> onBacktrackFromSubtree) {
ASSERT(beginIndex <= endIndex, "Invalid parameters"); ASSERT(beginIndex <= endIndex, "Invalid parameters");
if (0 == depth) { if (0 == depth) {
ASSERT(beginIndex <= 1 && endIndex == 1, "With depth 0, we can only traverse one or zero leaves (i.e. traverse one leaf or traverse a gap leaf)."); ASSERT(beginIndex <= 1 && endIndex == 1, "With depth 0, we can only traverse one or zero leaves (i.e. traverse one leaf or traverse a gap leaf).");
@ -148,7 +154,8 @@ namespace blobstore {
for (uint32_t childIndex = 0; childIndex < beginChild; ++childIndex) { for (uint32_t childIndex = 0; childIndex < beginChild; ++childIndex) {
uint32_t childOffset = childIndex * leavesPerChild; uint32_t childOffset = childIndex * leavesPerChild;
auto child = _createNewSubtree(leavesPerChild, leavesPerChild, leafOffset + childOffset, depth - 1, auto child = _createNewSubtree(leavesPerChild, leavesPerChild, leafOffset + childOffset, depth - 1,
[] (uint32_t /*index*/)->Data {ASSERT(false, "We're only creating gap leaves here, not traversing any.");}); [] (uint32_t /*index*/)->Data {ASSERT(false, "We're only creating gap leaves here, not traversing any.");},
[] (DataInnerNode* /*node*/) {});
children.push_back(std::move(child)); children.push_back(std::move(child));
} }
// Create new children that are traversed // Create new children that are traversed
@ -156,7 +163,7 @@ namespace blobstore {
uint32_t childOffset = childIndex * leavesPerChild; uint32_t childOffset = childIndex * leavesPerChild;
uint32_t localBeginIndex = utils::maxZeroSubtraction(beginIndex, childOffset); uint32_t localBeginIndex = utils::maxZeroSubtraction(beginIndex, childOffset);
uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset); uint32_t localEndIndex = std::min(leavesPerChild, endIndex - childOffset);
auto child = _createNewSubtree(localBeginIndex, localEndIndex, leafOffset + childOffset, depth - 1, onCreateLeaf); auto child = _createNewSubtree(localBeginIndex, localEndIndex, leafOffset + childOffset, depth - 1, onCreateLeaf, onBacktrackFromSubtree);
children.push_back(std::move(child)); children.push_back(std::move(child));
} }
@ -166,6 +173,10 @@ namespace blobstore {
for (auto childIter = children.begin()+1; childIter != children.end(); ++childIter) { for (auto childIter = children.begin()+1; childIter != children.end(); ++childIter) {
newNode->addChild(**childIter); newNode->addChild(**childIter);
} }
// This is only a backtrack, if we actually created a leaf here.
if (endIndex > beginIndex) {
onBacktrackFromSubtree(newNode.get());
}
return newNode; return newNode;
} }

View File

@ -25,20 +25,23 @@ namespace blobstore {
public: public:
LeafTraverser(datanodestore::DataNodeStore *nodeStore); LeafTraverser(datanodestore::DataNodeStore *nodeStore);
cpputils::unique_ref<datanodestore::DataNode> __attribute__((warn_unused_result)) traverseAndReturnRoot( cpputils::unique_ref<datanodestore::DataNode> traverseAndReturnRoot(
cpputils::unique_ref<datanodestore::DataNode> root, uint32_t beginIndex, uint32_t endIndex, cpputils::unique_ref<datanodestore::DataNode> root, uint32_t beginIndex, uint32_t endIndex,
std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf,
std::function<cpputils::Data (uint32_t index)> onCreateLeaf); std::function<cpputils::Data (uint32_t index)> onCreateLeaf,
std::function<void (datanodestore::DataInnerNode *node)> onBacktrackFromSubtree);
private: private:
datanodestore::DataNodeStore *_nodeStore; datanodestore::DataNodeStore *_nodeStore;
void _traverseExistingSubtree(datanodestore::DataNode *root, uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, bool growLastLeaf, void _traverseExistingSubtree(datanodestore::DataNode *root, uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, bool growLastLeaf,
std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf, std::function<void (uint32_t index, datanodestore::DataLeafNode* leaf)> onExistingLeaf,
std::function<cpputils::Data (uint32_t index)> onCreateLeaf); std::function<cpputils::Data (uint32_t index)> onCreateLeaf,
std::function<void (datanodestore::DataInnerNode *node)> onBacktrackFromSubtree);
cpputils::unique_ref<datanodestore::DataInnerNode> _increaseTreeDepth(cpputils::unique_ref<datanodestore::DataNode> root); cpputils::unique_ref<datanodestore::DataInnerNode> _increaseTreeDepth(cpputils::unique_ref<datanodestore::DataNode> root);
cpputils::unique_ref<datanodestore::DataNode> _createNewSubtree(uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, uint8_t depth, cpputils::unique_ref<datanodestore::DataNode> _createNewSubtree(uint32_t beginIndex, uint32_t endIndex, uint32_t leafOffset, uint8_t depth,
std::function<cpputils::Data (uint32_t index)> onCreateLeaf); std::function<cpputils::Data (uint32_t index)> onCreateLeaf,
std::function<void (datanodestore::DataInnerNode *node)> onBacktrackFromSubtree);
uint32_t _maxLeavesForTreeDepth(uint8_t depth) const; uint32_t _maxLeavesForTreeDepth(uint8_t depth) const;
std::function<cpputils::Data (uint32_t index)> _createMaxSizeLeaf() const; std::function<cpputils::Data (uint32_t index)> _createMaxSizeLeaf() const;

View File

@ -136,6 +136,7 @@ public:
}; };
TEST_F(BlobSizeDataTest, BlobIsZeroedOutAfterGrowing) { TEST_F(BlobSizeDataTest, BlobIsZeroedOutAfterGrowing) {
//uint32_t LARGE_SIZE = 2*1024*1024;
blob->resize(LARGE_SIZE); blob->resize(LARGE_SIZE);
EXPECT_EQ(0, std::memcmp(readBlob(*blob).data(), ZEROES.data(), LARGE_SIZE)); EXPECT_EQ(0, std::memcmp(readBlob(*blob).data(), ZEROES.data(), LARGE_SIZE));
} }