Fix locking

This commit is contained in:
Sebastian Messmer 2015-04-09 16:30:36 +02:00
parent 9a959cfab9
commit 3f43dcfe10
2 changed files with 29 additions and 22 deletions

View File

@ -112,7 +112,6 @@ unique_ptr<DataNode> DataTree::releaseRootNode() {
} }
void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, function<void (DataLeafNode*, uint32_t)> func) { void DataTree::traverseLeaves(uint32_t beginIndex, uint32_t endIndex, function<void (DataLeafNode*, uint32_t)> func) {
shared_lock<shared_mutex> lock(_mutex);
const_cast<const DataTree*>(this)->traverseLeaves(beginIndex, endIndex, [func](const DataLeafNode* leaf, uint32_t leafIndex) { const_cast<const DataTree*>(this)->traverseLeaves(beginIndex, endIndex, [func](const DataLeafNode* leaf, uint32_t leafIndex) {
func(const_cast<DataLeafNode*>(leaf), leafIndex); func(const_cast<DataLeafNode*>(leaf), leafIndex);
}); });
@ -156,11 +155,15 @@ uint32_t DataTree::leavesPerFullChild(const DataInnerNode &root) const {
} }
uint64_t DataTree::numStoredBytes() const { uint64_t DataTree::numStoredBytes() const {
return numStoredBytes(*_rootNode); shared_lock<shared_mutex> lock(_mutex);
return _numStoredBytes();
} }
uint64_t DataTree::numStoredBytes(const DataNode &root) const { uint64_t DataTree::_numStoredBytes() const {
shared_lock<shared_mutex> lock(_mutex); return _numStoredBytes(*_rootNode);
}
uint64_t DataTree::_numStoredBytes(const DataNode &root) const {
const DataLeafNode *leaf = dynamic_cast<const DataLeafNode*>(&root); const DataLeafNode *leaf = dynamic_cast<const DataLeafNode*>(&root);
if (leaf != nullptr) { if (leaf != nullptr) {
return leaf->numBytes(); return leaf->numBytes();
@ -169,16 +172,18 @@ uint64_t DataTree::numStoredBytes(const DataNode &root) const {
const DataInnerNode &inner = dynamic_cast<const DataInnerNode&>(root); const DataInnerNode &inner = dynamic_cast<const DataInnerNode&>(root);
uint64_t numBytesInLeftChildren = (inner.numChildren()-1) * leavesPerFullChild(inner) * _nodeStore->layout().maxBytesPerLeaf(); uint64_t numBytesInLeftChildren = (inner.numChildren()-1) * leavesPerFullChild(inner) * _nodeStore->layout().maxBytesPerLeaf();
auto lastChild = _nodeStore->load(inner.LastChild()->key()); auto lastChild = _nodeStore->load(inner.LastChild()->key());
uint64_t numBytesInRightChild = numStoredBytes(*lastChild); uint64_t numBytesInRightChild = _numStoredBytes(*lastChild);
return numBytesInLeftChildren + numBytesInRightChild; return numBytesInLeftChildren + numBytesInRightChild;
} }
void DataTree::resizeNumBytes(uint64_t newNumBytes) { void DataTree::resizeNumBytes(uint64_t newNumBytes) {
unique_lock<shared_mutex> lock(_mutex); boost::upgrade_lock<shared_mutex> lock(_mutex);
{
boost::upgrade_to_unique_lock<shared_mutex> exclusiveLock(lock);
//TODO Faster implementation possible (no addDataLeaf()/removeLastDataLeaf() in a loop, but directly resizing) //TODO Faster implementation possible (no addDataLeaf()/removeLastDataLeaf() in a loop, but directly resizing)
LastLeaf(_rootNode.get())->resize(_nodeStore->layout().maxBytesPerLeaf()); LastLeaf(_rootNode.get())->resize(_nodeStore->layout().maxBytesPerLeaf());
uint64_t currentNumBytes = numStoredBytes(); uint64_t currentNumBytes = _numStoredBytes();
assert(currentNumBytes % _nodeStore->layout().maxBytesPerLeaf() == 0); assert(currentNumBytes % _nodeStore->layout().maxBytesPerLeaf() == 0);
uint32_t currentNumLeaves = currentNumBytes / _nodeStore->layout().maxBytesPerLeaf(); uint32_t currentNumLeaves = currentNumBytes / _nodeStore->layout().maxBytesPerLeaf();
uint32_t newNumLeaves = std::max(1u, utils::ceilDivision(newNumBytes, _nodeStore->layout().maxBytesPerLeaf())); uint32_t newNumLeaves = std::max(1u, utils::ceilDivision(newNumBytes, _nodeStore->layout().maxBytesPerLeaf()));
@ -191,9 +196,9 @@ void DataTree::resizeNumBytes(uint64_t newNumBytes) {
} }
uint32_t newLastLeafSize = newNumBytes - (newNumLeaves-1)*_nodeStore->layout().maxBytesPerLeaf(); uint32_t newLastLeafSize = newNumBytes - (newNumLeaves-1)*_nodeStore->layout().maxBytesPerLeaf();
LastLeaf(_rootNode.get())->resize(newLastLeafSize); LastLeaf(_rootNode.get())->resize(newLastLeafSize);
assert(newNumBytes == numStoredBytes());
flush(); flush();
}
assert(newNumBytes == numStoredBytes());
} }
optional_ownership_ptr<DataLeafNode> DataTree::LastLeaf(DataNode *root) { optional_ownership_ptr<DataLeafNode> DataTree::LastLeaf(DataNode *root) {

View File

@ -55,9 +55,11 @@ private:
void deleteLastChildSubtree(datanodestore::DataInnerNode *node); void deleteLastChildSubtree(datanodestore::DataInnerNode *node);
void ifRootHasOnlyOneChildReplaceRootWithItsChild(); void ifRootHasOnlyOneChildReplaceRootWithItsChild();
//TODO Use underscore for private methods
void traverseLeaves(const datanodestore::DataNode *root, uint32_t leafOffset, uint32_t beginIndex, uint32_t endIndex, std::function<void (const datanodestore::DataLeafNode*, uint32_t)> func) const; void traverseLeaves(const datanodestore::DataNode *root, uint32_t leafOffset, uint32_t beginIndex, uint32_t endIndex, std::function<void (const datanodestore::DataLeafNode*, uint32_t)> func) const;
uint32_t leavesPerFullChild(const datanodestore::DataInnerNode &root) const; uint32_t leavesPerFullChild(const datanodestore::DataInnerNode &root) const;
uint64_t numStoredBytes(const datanodestore::DataNode &root) const; uint64_t _numStoredBytes() const;
uint64_t _numStoredBytes(const datanodestore::DataNode &root) const;
cpputils::optional_ownership_ptr<datanodestore::DataLeafNode> LastLeaf(datanodestore::DataNode *root); cpputils::optional_ownership_ptr<datanodestore::DataLeafNode> LastLeaf(datanodestore::DataNode *root);
std::unique_ptr<datanodestore::DataLeafNode> LastLeaf(std::unique_ptr<datanodestore::DataNode> root); std::unique_ptr<datanodestore::DataLeafNode> LastLeaf(std::unique_ptr<datanodestore::DataNode> root);