- Use one block flush less when growing by a traversal (the one that grows the old last leaf to max size)

- Fix numLEaves() calculation
This commit is contained in:
Sebastian Messmer 2015-04-10 22:57:29 +02:00
parent bee68464dc
commit eaa60d3a53
3 changed files with 34 additions and 16 deletions

View File

@ -65,7 +65,6 @@ uint64_t BlobOnBlocks::tryRead(void *target, uint64_t offset, uint64_t count) co
} }
void BlobOnBlocks::write(const void *source, uint64_t offset, uint64_t size) { void BlobOnBlocks::write(const void *source, uint64_t offset, uint64_t size) {
//resizeIfSmallerThan(offset + size);
traverseLeaves(offset, size, [source, offset] (uint64_t indexOfFirstLeafByte, DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) { traverseLeaves(offset, size, [source, offset] (uint64_t indexOfFirstLeafByte, DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
//TODO Simplify formula, make it easier to understand //TODO Simplify formula, make it easier to understand
leaf->write((uint8_t*)source + indexOfFirstLeafByte - offset + leafDataOffset, leafDataOffset, leafDataSize); leaf->write((uint8_t*)source + indexOfFirstLeafByte - offset + leafDataOffset, leafDataOffset, leafDataSize);

View File

@ -129,15 +129,23 @@ unique_ptr<DataNode> DataTree::releaseRootNode() {
return std::move(_rootNode); return std::move(_rootNode);
} }
//TODO Test numLeaves() //TODO Test numLeaves(), for example also two configurations with same number of bytes but different number of leaves (last leaf has 0 bytes)
uint32_t DataTree::numLeaves() const { uint32_t DataTree::numLeaves() const {
//TODO Direct calculating the number of leaves would be faster return _numLeaves(*_rootNode);
uint64_t currentNumBytes = _numStoredBytes();
if(currentNumBytes == 0) {
//We always have at least one leaf
currentNumBytes = 1;
} }
return utils::ceilDivision(currentNumBytes, _nodeStore->layout().maxBytesPerLeaf());
uint32_t DataTree::_numLeaves(const DataNode &node) const {
const DataLeafNode *leaf = dynamic_cast<const DataLeafNode*>(&node);
if (leaf != nullptr) {
return 1;
}
const DataInnerNode &inner = dynamic_cast<const DataInnerNode&>(node);
uint64_t numLeavesInLeftChildren = (inner.numChildren()-1) * leavesPerFullChild(inner);
auto lastChild = _nodeStore->load(inner.LastChild()->key());
uint64_t numLeavesInRightChild = _numLeaves(*lastChild);
return numLeavesInLeftChildren + numLeavesInRightChild;
} }
void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, function<void (DataLeafNode*, uint32_t)> func) { void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, function<void (DataLeafNode*, uint32_t)> func) {
@ -150,20 +158,30 @@ void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, function<v
//TODO Test cases that actually increase it here by 0 level / 1 level / more than 1 level //TODO Test cases that actually increase it here by 0 level / 1 level / more than 1 level
increaseTreeDepth(neededTreeDepth - _rootNode->depth()); increaseTreeDepth(neededTreeDepth - _rootNode->depth());
} }
if (numLeaves < endIndex) {
//TODO Can this case be efficiently combined with the traversing? if (numLeaves <= beginIndex) {
LastLeaf(_rootNode.get())->resize(_nodeStore->layout().maxBytesPerLeaf());
}
uint32_t lastLeafIndex = std::max(numLeaves, endIndex) - 1;
if (numLeaves < beginIndex) {
//TODO Test cases with numLeaves < / >= beginIndex //TODO Test cases with numLeaves < / >= beginIndex
return _traverseLeaves(_rootNode.get(), 0, numLeaves, endIndex, [beginIndex, numLeaves, lastLeafIndex, &func](DataLeafNode* node, uint32_t index) { // There is a gap between the current size and the begin of the traversal
return _traverseLeaves(_rootNode.get(), 0, numLeaves-1, endIndex, [beginIndex, numLeaves, &func, this](DataLeafNode* node, uint32_t index) {
if (index >= beginIndex) { if (index >= beginIndex) {
func(node, index); func(node, index);
} else if (index == numLeaves - 1) {
// It is the old last leaf - resize it to maximum
node->resize(_nodeStore->layout().maxBytesPerLeaf());
} }
}); });
} else if (numLeaves < endIndex) {
// We are starting traversal in the valid region, but traverse until after it (we grow new leaves)
return _traverseLeaves(_rootNode.get(), 0, beginIndex, endIndex, [numLeaves, &func, this] (DataLeafNode *node, uint32_t index) {
if (index == numLeaves - 1) {
// It is the old last leaf - resize it to maximum
node->resize(_nodeStore->layout().maxBytesPerLeaf());
}
func(node, index);
});
} else { } else {
return _traverseLeaves(_rootNode.get(), 0, beginIndex, endIndex, func); //We are traversing entierly inside the valid region
_traverseLeaves(_rootNode.get(), 0, beginIndex, endIndex, func);
} }
} }

View File

@ -61,6 +61,7 @@ private:
uint32_t leavesPerFullChild(const datanodestore::DataInnerNode &root) const; uint32_t leavesPerFullChild(const datanodestore::DataInnerNode &root) const;
uint64_t _numStoredBytes() const; uint64_t _numStoredBytes() const;
uint64_t _numStoredBytes(const datanodestore::DataNode &root) const; uint64_t _numStoredBytes(const datanodestore::DataNode &root) const;
uint32_t _numLeaves(const datanodestore::DataNode &node) const;
cpputils::optional_ownership_ptr<datanodestore::DataLeafNode> LastLeaf(datanodestore::DataNode *root); cpputils::optional_ownership_ptr<datanodestore::DataLeafNode> LastLeaf(datanodestore::DataNode *root);
std::unique_ptr<datanodestore::DataLeafNode> LastLeaf(std::unique_ptr<datanodestore::DataNode> root); std::unique_ptr<datanodestore::DataLeafNode> LastLeaf(std::unique_ptr<datanodestore::DataNode> root);
datanodestore::DataInnerNode* increaseTreeDepth(unsigned int levels); datanodestore::DataInnerNode* increaseTreeDepth(unsigned int levels);