Adapt to new blockstore which has Block::write() instead of writeable Block::data() pointer

This commit is contained in:
Sebastian Messmer 2015-03-04 20:58:39 +01:00
parent b43464d669
commit 3a01f95467
14 changed files with 98 additions and 84 deletions

View File

@ -31,7 +31,7 @@ void BlobOnBlocks::resize(uint64_t numBytes) {
_datatree->resizeNumBytes(numBytes);
}
void BlobOnBlocks::traverseLeaves(uint64_t beginByte, uint64_t sizeBytes, function<void (uint64_t, void *, uint32_t)> func) const {
void BlobOnBlocks::traverseLeaves(uint64_t beginByte, uint64_t sizeBytes, function<void (uint64_t, DataLeafNode *leaf, uint32_t, uint32_t)> func) const {
uint64_t endByte = beginByte + sizeBytes;
assert(endByte <= size());
uint32_t firstLeaf = beginByte / _datatree->maxBytesPerLeaf();
@ -40,20 +40,20 @@ void BlobOnBlocks::traverseLeaves(uint64_t beginByte, uint64_t sizeBytes, functi
uint64_t indexOfFirstLeafByte = leafIndex * leaf->maxStoreableBytes();
uint32_t dataBegin = utils::maxZeroSubtraction(beginByte, indexOfFirstLeafByte);
uint32_t dataSize = std::min((uint64_t)leaf->maxStoreableBytes(), endByte - indexOfFirstLeafByte);
func(indexOfFirstLeafByte, (uint8_t*)leaf->data() + dataBegin, dataSize);
func(indexOfFirstLeafByte, leaf, dataBegin, dataSize);
});
}
void BlobOnBlocks::read(void *target, uint64_t offset, uint64_t size) const {
traverseLeaves(offset, size, [target] (uint64_t indexOfFirstLeafByte, void *leafDataBegin, uint32_t leafDataSize) {
std::memcpy(target, leafDataBegin, leafDataSize);
traverseLeaves(offset, size, [target] (uint64_t indexOfFirstLeafByte, const DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
leaf->read((uint8_t*)target + indexOfFirstLeafByte + leafDataOffset, leafDataOffset, leafDataSize);
});
}
void BlobOnBlocks::write(const void *source, uint64_t offset, uint64_t size) {
resizeIfSmallerThan(offset + size);
traverseLeaves(offset, size, [source] (uint64_t indexOfFirstLeafByte, void *leafDataBegin, uint32_t leafDataSize) {
std::memcpy(leafDataBegin, source, leafDataSize);
traverseLeaves(offset, size, [source] (uint64_t indexOfFirstLeafByte, DataLeafNode *leaf, uint32_t leafDataOffset, uint32_t leafDataSize) {
leaf->write((uint8_t*)source + indexOfFirstLeafByte + leafDataOffset, leafDataOffset, leafDataSize);
});
}

View File

@ -30,7 +30,7 @@ public:
private:
void traverseLeaves(uint64_t offsetBytes, uint64_t sizeBytes, std::function<void (uint64_t, void *, uint32_t)>) const;
void traverseLeaves(uint64_t offsetBytes, uint64_t sizeBytes, std::function<void (uint64_t, datanodestore::DataLeafNode *, uint32_t, uint32_t)>) const;
void resizeIfSmallerThan(uint64_t neededSize);
std::unique_ptr<datatreestore::DataTree> _datatree;

View File

@ -21,19 +21,15 @@ DataInnerNode::~DataInnerNode() {
unique_ptr<DataInnerNode> DataInnerNode::InitializeNewNode(unique_ptr<Block> block, const DataNode &first_child) {
DataNodeView node(std::move(block));
*node.Depth() = first_child.depth() + 1;
*node.Size() = 1;
node.setDepth(first_child.depth() + 1);
node.setSize(1);
auto result = make_unique<DataInnerNode>(std::move(node));
result->ChildrenBegin()->setKey(first_child.key());
return result;
}
uint8_t DataInnerNode::depth() const {
return *node().Depth();
}
uint32_t DataInnerNode::numChildren() const {
return *node().Size();
return node().Size();
}
DataInnerNode::ChildEntry *DataInnerNode::ChildrenBegin() {
@ -49,7 +45,7 @@ DataInnerNode::ChildEntry *DataInnerNode::ChildrenEnd() {
}
const DataInnerNode::ChildEntry *DataInnerNode::ChildrenEnd() const {
return ChildrenBegin() + *node().Size();
return ChildrenBegin() + node().Size();
}
DataInnerNode::ChildEntry *DataInnerNode::LastChild() {
@ -57,7 +53,7 @@ DataInnerNode::ChildEntry *DataInnerNode::LastChild() {
}
const DataInnerNode::ChildEntry *DataInnerNode::LastChild() const {
return ChildrenEnd()-1;
return getChild(numChildren()-1);
}
DataInnerNode::ChildEntry *DataInnerNode::getChild(unsigned int index) {
@ -72,13 +68,13 @@ const DataInnerNode::ChildEntry *DataInnerNode::getChild(unsigned int index) con
void DataInnerNode::addChild(const DataNode &child) {
assert(numChildren() < maxStoreableChildren());
assert(child.depth() == depth()-1);
*node().Size() += 1;
node().setSize(node().Size()+1);
LastChild()->setKey(child.key());
}
void DataInnerNode::removeLastChild() {
assert(*node().Size() > 1);
*node().Size() -= 1;
assert(node().Size() > 1);
node().setSize(node().Size()-1);
}
uint32_t DataInnerNode::maxStoreableChildren() const {

View File

@ -20,8 +20,6 @@ public:
uint32_t maxStoreableChildren() const;
uint8_t depth() const;
ChildEntry *getChild(unsigned int index);
const ChildEntry *getChild(unsigned int index) const;

View File

@ -13,7 +13,7 @@ namespace datanodestore {
DataLeafNode::DataLeafNode(DataNodeView view)
: DataNode(std::move(view)) {
assert(*node().Depth() == 0);
assert(node().Depth() == 0);
assert(numBytes() <= maxStoreableBytes());
}
@ -22,35 +22,39 @@ DataLeafNode::~DataLeafNode() {
unique_ptr<DataLeafNode> DataLeafNode::InitializeNewNode(unique_ptr<Block> block) {
DataNodeView node(std::move(block));
*node.Depth() = 0;
*node.Size() = 0;
node.setDepth(0);
node.setSize(0);
//fillDataWithZeroes(); not needed, because a newly created block will be zeroed out. DataLeafNodeTest.SpaceIsZeroFilledWhenGrowing ensures this.
return make_unique<DataLeafNode>(std::move(node));
}
void *DataLeafNode::data() {
return const_cast<void*>(const_cast<const DataLeafNode*>(this)->data());
void DataLeafNode::read(void *target, uint64_t offset, uint64_t size) const {
assert(offset <= node().Size() && offset + size <= node().Size()); // Also check offset, because the addition could lead to overflows
std::memcpy(target, (uint8_t*)node().data() + offset, size);
}
const void *DataLeafNode::data() const {
return node().DataBegin<uint8_t>();
void DataLeafNode::write(const void *source, uint64_t offset, uint64_t size) {
assert(offset <= node().Size() && offset + size <= node().Size()); // Also check offset, because the addition could lead to overflows
node().write(source, offset, size);
}
uint32_t DataLeafNode::numBytes() const {
return *node().Size();
return node().Size();
}
void DataLeafNode::resize(uint32_t new_size) {
assert(new_size <= maxStoreableBytes());
uint32_t old_size = *node().Size();
uint32_t old_size = node().Size();
if (new_size < old_size) {
fillDataWithZeroesFromTo(new_size, old_size);
}
*node().Size() = new_size;
node().setSize(new_size);
}
void DataLeafNode::fillDataWithZeroesFromTo(off_t begin, off_t end) {
std::memset(node().DataBegin<uint8_t>()+begin, 0, end-begin);
Data ZEROES(end-begin);
ZEROES.FillWithZeroes();
node().write(ZEROES.data(), begin, end-begin);
}
uint32_t DataLeafNode::maxStoreableBytes() const {

View File

@ -18,8 +18,8 @@ public:
uint32_t maxStoreableBytes() const;
void *data();
const void *data() const;
void read(void *target, uint64_t offset, uint64_t size) const;
void write(const void *source, uint64_t offset, uint64_t size);
uint32_t numBytes() const;

View File

@ -2,6 +2,7 @@
#include "DataLeafNode.h"
#include "DataNode.h"
#include "DataNodeStore.h"
#include <messmer/blockstore/utils/BlockStoreUtils.h>
using blockstore::Block;
using blockstore::Key;
@ -34,13 +35,13 @@ const Key &DataNode::key() const {
}
uint8_t DataNode::depth() const {
return *node().Depth();
return _node.Depth();
}
unique_ptr<DataInnerNode> DataNode::convertToNewInnerNode(unique_ptr<DataNode> node, const DataNode &first_child) {
Key key = node->key();
auto block = node->_node.releaseBlock();
std::memset(block->data(), 0, block->size());
blockstore::utils::fillWithZeroes(block.get());
return DataInnerNode::InitializeNewNode(std::move(block), first_child);
}

View File

@ -28,9 +28,9 @@ unique_ptr<DataNode> DataNodeStore::load(unique_ptr<Block> block) {
assert(block->size() == _layout.blocksizeBytes());
DataNodeView node(std::move(block));
if (*node.Depth() == 0) {
if (node.Depth() == 0) {
return unique_ptr<DataLeafNode>(new DataLeafNode(std::move(node)));
} else if (*node.Depth() <= MAX_DEPTH) {
} else if (node.Depth() <= MAX_DEPTH) {
return unique_ptr<DataInnerNode>(new DataInnerNode(std::move(node)));
} else {
throw runtime_error("Tree is to deep. Data corruption?");

View File

@ -10,6 +10,7 @@
#include <memory>
#include <stdexcept>
#include <type_traits>
namespace blobstore {
namespace onblocks {
@ -32,6 +33,7 @@ public:
//Where in the header is the size field (for inner nodes: number of children, for leafs: content data size)
static constexpr uint32_t SIZE_OFFSET_BYTES = 4;
//Size of a block (header + data region)
constexpr uint32_t blocksizeBytes() const {
return _blocksizeBytes;
@ -63,20 +65,28 @@ public:
DataNodeView(DataNodeView &&rhs) = default;
const uint8_t *Depth() const {
return GetOffset<DataNodeLayout::DEPTH_OFFSET_BYTES, uint8_t>();
uint8_t Depth() const {
return *((uint8_t*)_block->data()+DataNodeLayout::DEPTH_OFFSET_BYTES);
}
uint8_t *Depth() {
return const_cast<uint8_t*>(const_cast<const DataNodeView*>(this)->Depth());
void setDepth(uint8_t value) {
_block->write(&value, DataNodeLayout::DEPTH_OFFSET_BYTES, sizeof(value));
}
const uint32_t *Size() const {
return GetOffset<DataNodeLayout::SIZE_OFFSET_BYTES, uint32_t>();
uint32_t Size() const {
return *(uint32_t*)((uint8_t*)_block->data()+DataNodeLayout::SIZE_OFFSET_BYTES);
}
uint32_t *Size() {
return const_cast<uint32_t*>(const_cast<const DataNodeView*>(this)->Size());
void setSize(uint32_t value) {
_block->write(&value, DataNodeLayout::SIZE_OFFSET_BYTES, sizeof(value));
}
const void *data() const {
return (uint8_t*)_block->data() + DataNodeLayout::HEADERSIZE_BYTES;
}
void write(const void *source, uint64_t offset, uint64_t size) {
_block->write(source, offset + DataNodeLayout::HEADERSIZE_BYTES, size);
}
template<typename Entry>
@ -89,10 +99,6 @@ public:
return const_cast<Entry*>(const_cast<const DataNodeView*>(this)->DataBegin<Entry>());
}
DataNodeLayout layout() const {
return DataNodeLayout(_block->size());
}
template<typename Entry>
const Entry *DataEnd() const {
const unsigned int NUM_ENTRIES = layout().datasizeBytes() / sizeof(Entry);
@ -104,6 +110,11 @@ public:
return const_cast<Entry*>(const_cast<const DataNodeView*>(this)->DataEnd<Entry>());
}
DataNodeLayout layout() const {
return DataNodeLayout(_block->size());
}
std::unique_ptr<blockstore::Block> releaseBlock() {
return std::move(_block);
}

View File

@ -50,10 +50,16 @@ public:
std::memcpy(randomData.data(), dataFixture.data(), randomData.size());
}
Data loadData(const DataLeafNode &leaf) {
Data data(leaf.numBytes());
leaf.read(data.data(), 0, leaf.numBytes());
return data;
}
Key WriteDataToNewLeafBlockAndReturnKey() {
auto newleaf = nodeStore->createNewLeafNode();
newleaf->resize(randomData.size());
std::memcpy(newleaf->data(), randomData.data(), randomData.size());
newleaf->write(randomData.data(), 0, randomData.size());
return newleaf->key();
}
@ -63,7 +69,7 @@ public:
void FillLeafBlockWithData(DataLeafNode *leaf_to_fill) {
leaf_to_fill->resize(randomData.size());
std::memcpy(leaf_to_fill->data(), randomData.data(), randomData.size());
leaf_to_fill->write(randomData.data(), 0, randomData.size());
}
unique_ptr<DataLeafNode> LoadLeafNode(const Key &key) {
@ -139,7 +145,7 @@ TEST_F(DataLeafNodeTest, ReadWrittenDataAfterReloadingBlock) {
auto loaded = LoadLeafNode(key);
EXPECT_EQ(randomData.size(), loaded->numBytes());
EXPECT_EQ(0, std::memcmp(randomData.data(), loaded->data(), randomData.size()));
EXPECT_EQ(0, std::memcmp(randomData.data(), loadData(*loaded).data(), randomData.size()));
}
TEST_F(DataLeafNodeTest, NewLeafNodeHasSizeZero) {
@ -177,7 +183,7 @@ TEST_P(DataLeafNodeSizeTest, ResizeNode_ReadSizeAfterLoading) {
TEST_F(DataLeafNodeTest, SpaceIsZeroFilledWhenGrowing) {
leaf->resize(randomData.size());
EXPECT_EQ(0, std::memcmp(ZEROES.data(), leaf->data(), randomData.size()));
EXPECT_EQ(0, std::memcmp(ZEROES.data(), loadData(*leaf).data(), randomData.size()));
}
TEST_F(DataLeafNodeTest, SpaceGetsZeroFilledWhenShrinkingAndRegrowing) {
@ -188,7 +194,7 @@ TEST_F(DataLeafNodeTest, SpaceGetsZeroFilledWhenShrinkingAndRegrowing) {
leaf->resize(randomData.size());
//Check that the space was filled with zeroes
EXPECT_EQ(0, std::memcmp(ZEROES.data(), ((uint8_t*)leaf->data())+smaller_size, 100));
EXPECT_EQ(0, std::memcmp(ZEROES.data(), ((uint8_t*)loadData(*leaf).data())+smaller_size, 100));
}
TEST_F(DataLeafNodeTest, DataGetsZeroFilledWhenShrinking) {
@ -214,7 +220,7 @@ TEST_F(DataLeafNodeTest, ShrinkingDoesntDestroyValidDataRegion) {
leaf->resize(smaller_size);
//Check that the remaining data region is unchanged
EXPECT_EQ(0, std::memcmp(randomData.data(), leaf->data(), smaller_size));
EXPECT_EQ(0, std::memcmp(randomData.data(), loadData(*leaf).data(), smaller_size));
}
TEST_F(DataLeafNodeTest, ConvertToInternalNode) {
@ -249,8 +255,13 @@ TEST_F(DataLeafNodeTest, CopyDataLeaf) {
auto copied = CopyLeafNode(*leaf);
EXPECT_EQ(leaf->numBytes(), copied->numBytes());
EXPECT_EQ(0, std::memcmp(leaf->data(), copied->data(), leaf->numBytes()));
EXPECT_NE(leaf->data(), copied->data());
EXPECT_EQ(0, std::memcmp(loadData(*leaf).data(), loadData(*copied).data(), leaf->numBytes()));
//Test that they have different data regions (changing the original one doesn't change the copy)
char data = '\0';
leaf->write(&data, 0, 1);
EXPECT_EQ(data, *(char*)loadData(*leaf).data());
EXPECT_NE(data, *(char*)loadData(*copied).data());
}
/* TODO

View File

@ -77,7 +77,7 @@ TEST_F(DataNodeStoreTest, DataNodeCrashesOnLoadIfDepthIsTooHigh) {
Key key = block->key();
{
DataNodeView view(std::move(block));
*view.Depth() = DataNodeStore::MAX_DEPTH + 1;
view.setDepth(DataNodeStore::MAX_DEPTH + 1);
}
EXPECT_ANY_THROW(

View File

@ -36,10 +36,10 @@ TEST_P(DataNodeViewDepthTest, DepthIsStored) {
auto key = block->key();
{
DataNodeView view(std::move(block));
*view.Depth() = GetParam();
view.setDepth(GetParam());
}
DataNodeView view(blockStore->load(key));
EXPECT_EQ(GetParam(), *view.Depth());
EXPECT_EQ(GetParam(), view.Depth());
}
class DataNodeViewSizeTest: public DataNodeViewTest, public WithParamInterface<uint32_t> {
@ -51,10 +51,10 @@ TEST_P(DataNodeViewSizeTest, SizeIsStored) {
auto key = block->key();
{
DataNodeView view(std::move(block));
*view.Size() = GetParam();
view.setSize(GetParam());
}
DataNodeView view(blockStore->load(key));
EXPECT_EQ(GetParam(), *view.Size());
EXPECT_EQ(GetParam(), view.Size());
}
TEST_F(DataNodeViewTest, DataIsStored) {
@ -75,13 +75,13 @@ TEST_F(DataNodeViewTest, HeaderAndBodyDontOverlap) {
auto key = block->key();
{
DataNodeView view(std::move(block));
*view.Depth() = 3;
*view.Size() = 1000000000u;
view.setDepth(3);
view.setSize(1000000000u);
std::memcpy(view.DataBegin<uint8_t>(), randomData.data(), DATASIZE_BYTES);
}
DataNodeView view(blockStore->load(key));
EXPECT_EQ(3, *view.Depth());
EXPECT_EQ(1000000000u, *view.Size());
EXPECT_EQ(3, view.Depth());
EXPECT_EQ(1000000000u, view.Size());
EXPECT_EQ(0, std::memcmp(view.DataBegin<uint8_t>(), randomData.data(), DATASIZE_BYTES));
}
@ -144,3 +144,5 @@ TEST_F(DataNodeViewTest, DataEndWorksWithStructByteEntries) {
EXPECT_EQ(blockBegin+DataNodeLayout::HEADERSIZE_BYTES + numFittingEntries * sizeof(SizedDataEntry), dataEnd);
EXPECT_LT(dataEnd, blockBegin + BLOCKSIZE_BYTES);
}
//TODO Test that header fields (and data) are also stored over reloads

View File

@ -201,20 +201,6 @@ TEST_P(DataTreeTest_ResizeNumBytes_P, DataStaysIntact) {
}
}
TEST_P(DataTreeTest_ResizeNumBytes_P, UnusedEndOfLastLeafIsZero) {
uint32_t oldNumberOfLeaves = std::max(1u, ceilDivision(tree->numStoredBytes(), nodeStore->layout().maxBytesPerLeaf()));
TwoLevelDataFixture data(nodeStore, TwoLevelDataFixture::SizePolicy::Unchanged);
Key key = tree->key();
tree.reset();
data.FillInto(nodeStore->load(key).get());
ResizeTree(key, newSize);
auto lastLeaf = LastLeaf(key);
EXPECT_EQ(0, std::memcmp(ZEROES.data(), (char*)lastLeaf->data()+lastLeaf->numBytes(), LAYOUT.maxBytesPerLeaf()-lastLeaf->numBytes()));
}
//Resize to zero is not caught in the parametrized test above, in the following, we test it separately.
TEST_F(DataTreeTest_ResizeNumBytes, ResizeToZero_NumBytesIsCorrect) {

View File

@ -15,20 +15,25 @@ public:
void FillInto(blobstore::onblocks::datanodestore::DataLeafNode *leaf) const {
leaf->resize(_data.size());
std::memcpy(leaf->data(), _data.data(), _data.size());
leaf->write(_data.data(), 0, _data.size());
}
void EXPECT_DATA_CORRECT(const blobstore::onblocks::datanodestore::DataLeafNode &leaf, int onlyCheckNumBytes = -1) const {
if (onlyCheckNumBytes == -1) {
EXPECT_EQ(_data.size(), leaf.numBytes());
EXPECT_EQ(0, std::memcmp(_data.data(), leaf.data(), _data.size()));
EXPECT_EQ(0, std::memcmp(_data.data(), loadData(leaf).data(), _data.size()));
} else {
EXPECT_LE(onlyCheckNumBytes, leaf.numBytes());
EXPECT_EQ(0, std::memcmp(_data.data(), leaf.data(), onlyCheckNumBytes));
EXPECT_EQ(0, std::memcmp(_data.data(), loadData(leaf).data(), onlyCheckNumBytes));
}
}
private:
static blockstore::Data loadData(const blobstore::onblocks::datanodestore::DataLeafNode &leaf) {
blockstore::Data data(leaf.numBytes());
leaf.read(data.data(), 0, leaf.numBytes());
return data;
}
DataBlockFixture _data;
};