diff --git a/biicode.conf b/biicode.conf index 07da8e10..f8103deb 100644 --- a/biicode.conf +++ b/biicode.conf @@ -1,6 +1,7 @@ # Biicode configuration file [requirements] + cryptopp/cryptopp: 8 google/gmock: 4 google/gtest: 11 messmer/cmake: 3 diff --git a/implementations/compressing/CompressedBlock.cpp b/implementations/compressing/CompressedBlock.cpp new file mode 100644 index 00000000..e226e7e8 --- /dev/null +++ b/implementations/compressing/CompressedBlock.cpp @@ -0,0 +1 @@ +#include "CompressedBlock.h" diff --git a/implementations/compressing/CompressedBlock.h b/implementations/compressing/CompressedBlock.h new file mode 100644 index 00000000..cf7b2301 --- /dev/null +++ b/implementations/compressing/CompressedBlock.h @@ -0,0 +1,127 @@ +#pragma once +#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSEDBLOCK_H_ +#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSEDBLOCK_H_ + +#include "../../interface/Block.h" +#include "../../interface/BlockStore.h" +#include +#include +#include + +namespace blockstore { +class BlockStore; +namespace compressing { +template class CompressingBlockStore; + +template +class CompressedBlock final: public Block { +public: + static boost::optional> TryCreateNew(BlockStore *baseBlockStore, const Key &key, cpputils::Data decompressedData); + static cpputils::unique_ref Decompress(cpputils::unique_ref baseBlock); + + CompressedBlock(cpputils::unique_ref baseBlock, cpputils::Data decompressedData); + ~CompressedBlock(); + + const void *data() const override; + void write(const void *source, uint64_t offset, uint64_t size) override; + + void flush() override; + + size_t size() const override; + void resize(size_t newSize) override; + + cpputils::unique_ref releaseBaseBlock(); + +private: + void _compressToBaseBlock(); + + cpputils::unique_ref _baseBlock; + cpputils::Data _decompressedData; + std::mutex _mutex; + bool _dataChanged; + + DISALLOW_COPY_AND_ASSIGN(CompressedBlock); +}; + +template +boost::optional>> CompressedBlock::TryCreateNew(BlockStore *baseBlockStore, const Key &key, cpputils::Data decompressedData) { + cpputils::Data compressed = Compressor::Compress(decompressedData); + auto baseBlock = baseBlockStore->tryCreate(key, std::move(compressed)); + if (baseBlock == boost::none) { + //TODO Test this code branch + return boost::none; + } + + return cpputils::make_unique_ref>(std::move(*baseBlock), std::move(decompressedData)); +} + +template +cpputils::unique_ref> CompressedBlock::Decompress(cpputils::unique_ref baseBlock) { + cpputils::Data decompressed = Compressor::Decompress((byte*)baseBlock->data(), baseBlock->size()); + return cpputils::make_unique_ref>(std::move(baseBlock), std::move(decompressed)); +} + +template +CompressedBlock::CompressedBlock(cpputils::unique_ref baseBlock, cpputils::Data decompressedData) + : Block(baseBlock->key()), + _baseBlock(std::move(baseBlock)), + _decompressedData(std::move(decompressedData)), + _dataChanged(false) { +} + +template +CompressedBlock::~CompressedBlock() { + std::unique_lock lock(_mutex); + _compressToBaseBlock(); +} + +template +const void *CompressedBlock::data() const { + return _decompressedData.data(); +} + +template +void CompressedBlock::write(const void *source, uint64_t offset, uint64_t size) { + std::memcpy((uint8_t*)_decompressedData.dataOffset(offset), source, size); + _dataChanged = true; +} + +template +void CompressedBlock::flush() { + std::unique_lock lock(_mutex); + _compressToBaseBlock(); + return _baseBlock->flush(); +} + +template +size_t CompressedBlock::size() const { + return _decompressedData.size(); +} + +template +void CompressedBlock::resize(size_t newSize) { + _decompressedData = cpputils::DataUtils::resize(std::move(_decompressedData), newSize); + _dataChanged = true; +} + +template +cpputils::unique_ref CompressedBlock::releaseBaseBlock() { + std::unique_lock lock(_mutex); + _compressToBaseBlock(); + return std::move(_baseBlock); +} + +template +void CompressedBlock::_compressToBaseBlock() { + if (_dataChanged) { + cpputils::Data compressed = Compressor::Compress(_decompressedData); + _baseBlock->resize(compressed.size()); + _baseBlock->write(compressed.data(), 0, compressed.size()); + _dataChanged = false; + } +} + +} +} + +#endif diff --git a/implementations/compressing/CompressingBlockStore.cpp b/implementations/compressing/CompressingBlockStore.cpp new file mode 100644 index 00000000..ab79d260 --- /dev/null +++ b/implementations/compressing/CompressingBlockStore.cpp @@ -0,0 +1 @@ +#include "CompressingBlockStore.h" diff --git a/implementations/compressing/CompressingBlockStore.h b/implementations/compressing/CompressingBlockStore.h new file mode 100644 index 00000000..06b22c66 --- /dev/null +++ b/implementations/compressing/CompressingBlockStore.h @@ -0,0 +1,77 @@ +#pragma once +#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSINGBLOCKSTORE_H_ +#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSINGBLOCKSTORE_H_ + +#include "../../interface/BlockStore.h" +#include "CompressedBlock.h" + +namespace blockstore { +namespace compressing { + +template +class CompressingBlockStore final: public BlockStore { +public: + CompressingBlockStore(cpputils::unique_ref baseBlockStore); + ~CompressingBlockStore(); + + Key createKey() override; + boost::optional> tryCreate(const Key &key, cpputils::Data data) override; + boost::optional> load(const Key &key) override; + void remove(cpputils::unique_ref block) override; + uint64_t numBlocks() const override; + +private: + cpputils::unique_ref _baseBlockStore; + + DISALLOW_COPY_AND_ASSIGN(CompressingBlockStore); +}; + +template +CompressingBlockStore::CompressingBlockStore(cpputils::unique_ref baseBlockStore) + : _baseBlockStore(std::move(baseBlockStore)) { +} + +template +CompressingBlockStore::~CompressingBlockStore() { +} + +template +Key CompressingBlockStore::createKey() { + return _baseBlockStore->createKey(); +} + +template +boost::optional> CompressingBlockStore::tryCreate(const Key &key, cpputils::Data data) { + auto result = CompressedBlock::TryCreateNew(_baseBlockStore.get(), key, std::move(data)); + if (result == boost::none) { + return boost::none; + } + return cpputils::unique_ref(std::move(*result)); +} + +template +boost::optional> CompressingBlockStore::load(const Key &key) { + auto loaded = _baseBlockStore->load(key); + if (loaded == boost::none) { + return boost::none; + } + return boost::optional>(CompressedBlock::Decompress(std::move(*loaded))); +} + +template +void CompressingBlockStore::remove(cpputils::unique_ref block) { + auto _block = cpputils::dynamic_pointer_move>(block); + ASSERT(_block != boost::none, "Wrong block type"); + auto baseBlock = (*_block)->releaseBaseBlock(); + return _baseBlockStore->remove(std::move(baseBlock)); +} + +template +uint64_t CompressingBlockStore::numBlocks() const { + return _baseBlockStore->numBlocks(); +} + +} +} + +#endif diff --git a/implementations/compressing/compressors/Gzip.cpp b/implementations/compressing/compressors/Gzip.cpp new file mode 100644 index 00000000..4c2d59da --- /dev/null +++ b/implementations/compressing/compressors/Gzip.cpp @@ -0,0 +1,29 @@ +#include "Gzip.h" +#include + +using cpputils::Data; + +namespace blockstore { + namespace compressing { + + Data Gzip::Compress(const Data &data) { + CryptoPP::Gzip zipper; + zipper.Put((byte *) data.data(), data.size()); + zipper.MessageEnd(); + Data compressed(zipper.MaxRetrievable()); + zipper.Get((byte *) compressed.data(), compressed.size()); + return compressed; + } + + Data Gzip::Decompress(const void *data, size_t size) { + //TODO Change interface to taking cpputils::Data objects (needs changing blockstore so we can read their "class Data", because this is called from CompressedBlock::Decompress()). + CryptoPP::Gunzip zipper; + zipper.Put((byte *) data, size); + zipper.MessageEnd(); + Data decompressed(zipper.MaxRetrievable()); + zipper.Get((byte *) decompressed.data(), decompressed.size()); + return decompressed; + } + + } +} \ No newline at end of file diff --git a/implementations/compressing/compressors/Gzip.h b/implementations/compressing/compressors/Gzip.h new file mode 100644 index 00000000..ab53e372 --- /dev/null +++ b/implementations/compressing/compressors/Gzip.h @@ -0,0 +1,18 @@ +#pragma once +#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_GZIP_H +#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_GZIP_H + +#include + +namespace blockstore { + namespace compressing { + class Gzip { + public: + static cpputils::Data Compress(const cpputils::Data &data); + + static cpputils::Data Decompress(const void *data, size_t size); + }; + } +} + +#endif diff --git a/implementations/compressing/compressors/RunLengthEncoding.cpp b/implementations/compressing/compressors/RunLengthEncoding.cpp new file mode 100644 index 00000000..ea190bfb --- /dev/null +++ b/implementations/compressing/compressors/RunLengthEncoding.cpp @@ -0,0 +1,141 @@ +#include "RunLengthEncoding.h" +#include +#include + +using cpputils::Data; +using std::string; +using std::ostringstream; +using std::istringstream; + +namespace blockstore { + namespace compressing { + + // Alternatively store a run of arbitrary bytes and a run of identical bytes. + // Each run is preceded by its length. Length fields are uint16_t. + // Example: 2 - 5 - 8 - 10 - 3 - 0 - 2 - 0 + // Length 2 arbitrary bytes (values: 5, 8), the next 10 bytes store "3" each, + // then 0 arbitrary bytes and 2x "0". + + Data RunLengthEncoding::Compress(const Data &data) { + ostringstream compressed; + uint8_t *current = (uint8_t*)data.data(); + uint8_t *end = (uint8_t*)data.data()+data.size(); + while (current < end) { + _encodeArbitraryWords(¤t, end, &compressed); + ASSERT(current <= end, "Overflow"); + if (current == end) { + break; + } + _encodeIdenticalWords(¤t, end, &compressed); + ASSERT(current <= end, "Overflow"); + } + return _extractData(&compressed); + } + + void RunLengthEncoding::_encodeArbitraryWords(uint8_t **current, uint8_t* end, ostringstream *output) { + uint16_t size = _arbitraryRunLength(*current, end); + output->write((const char*)&size, sizeof(uint16_t)); + output->write((const char*)*current, size); + *current += size; + } + + uint16_t RunLengthEncoding::_arbitraryRunLength(uint8_t *start, uint8_t* end) { + // Each stopping of an arbitrary bytes run costs us 5 byte, because we have to store the length + // for the identical bytes run (2 byte), the identical byte itself (1 byte) and the length for the next arbitrary bytes run (2 byte). + // So to get an advantage from stopping an arbitrary bytes run, at least 6 bytes have to be identical. + + // realEnd avoids an overflow of the 16bit counter + uint8_t *realEnd = std::min(end, start + std::numeric_limits::max()); + + // Count the number of identical bytes and return if it finds a run of more than 6 identical bytes. + uint8_t lastByte = *start + 1; // Something different from the first byte + uint8_t numIdenticalBytes = 1; + for(uint8_t *current = start; current != realEnd; ++current) { + if (*current == lastByte) { + ++numIdenticalBytes; + if (numIdenticalBytes == 6) { + return current - start - 5; //-5, because the end pointer for the arbitrary byte run should point to the first identical byte, not the one before. + } + } else { + numIdenticalBytes = 1; + } + lastByte = *current; + } + //It wasn't worth stopping the arbitrary bytes run anywhere. The whole region should be an arbitrary run. + return realEnd-start; + } + + void RunLengthEncoding::_encodeIdenticalWords(uint8_t **current, uint8_t* end, ostringstream *output) { + uint16_t size = _countIdenticalBytes(*current, end); + output->write((const char*)&size, sizeof(uint16_t)); + output->write((const char*)*current, 1); + *current += size; + } + + uint16_t RunLengthEncoding::_countIdenticalBytes(uint8_t *start, uint8_t *end) { + uint8_t *realEnd = std::min(end, start + std::numeric_limits::max()); // This prevents overflow of the 16bit counter + for (uint8_t *current = start+1; current != realEnd; ++current) { + if (*current != *start) { + return current-start; + } + } + // All bytes have been identical + return realEnd - start; + } + + Data RunLengthEncoding::_extractData(ostringstream *stream) { + string str = stream->str(); + Data data(str.size()); + std::memcpy(data.data(), str.c_str(), str.size()); + return data; + } + + Data RunLengthEncoding::Decompress(const void *data, size_t size) { + istringstream stream = _parseData((uint8_t*)data, size); + ostringstream decompressed; + while(_hasData(&stream)) { + _decodeArbitraryWords(&stream, &decompressed); + if (!_hasData(&stream)) { + break; + } + _decodeIdenticalWords(&stream, &decompressed); + } + return _extractData(&decompressed); + } + + bool RunLengthEncoding::_hasData(istringstream *str) { + str->peek(); + return !str->eof(); + } + + istringstream RunLengthEncoding::_parseData(const uint8_t *data, size_t size) { + string str((const char*)data, size); + istringstream result; + result.str(str); + return result; + } + + void RunLengthEncoding::_decodeArbitraryWords(istringstream *stream, ostringstream *decompressed) { + uint16_t size; + stream->read((char*)&size, sizeof(uint16_t)); + ASSERT(stream->good(), "Premature end of stream"); + Data run(size); + stream->read((char*)run.data(), size); + ASSERT(stream->good(), "Premature end of stream"); + decompressed->write((const char*)run.data(), run.size()); + } + + void RunLengthEncoding::_decodeIdenticalWords(istringstream *stream, ostringstream *decompressed) { + uint16_t size; + stream->read((char*)&size, sizeof(uint16_t)); + ASSERT(stream->good(), "Premature end of stream"); + uint8_t value; + stream->read((char*)&value, 1); + ASSERT(stream->good(), "Premature end of stream"); + Data run(size); + std::memset(run.data(), value, run.size()); + decompressed->write((const char*)run.data(), run.size()); + } + + } +} diff --git a/implementations/compressing/compressors/RunLengthEncoding.h b/implementations/compressing/compressors/RunLengthEncoding.h new file mode 100644 index 00000000..fbc88ed5 --- /dev/null +++ b/implementations/compressing/compressors/RunLengthEncoding.h @@ -0,0 +1,29 @@ +#pragma once +#ifndef MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_RUNLENGTHENCODING_H +#define MESSMER_BLOCKSTORE_IMPLEMENTATIONS_COMPRESSING_COMPRESSORS_RUNLENGTHENCODING_H + +#include + +namespace blockstore { + namespace compressing { + class RunLengthEncoding { + public: + static cpputils::Data Compress(const cpputils::Data &data); + + static cpputils::Data Decompress(const void *data, size_t size); + + private: + static void _encodeArbitraryWords(uint8_t **current, uint8_t* end, std::ostringstream *output); + static uint16_t _arbitraryRunLength(uint8_t *start, uint8_t* end); + static void _encodeIdenticalWords(uint8_t **current, uint8_t* end, std::ostringstream *output); + static uint16_t _countIdenticalBytes(uint8_t *start, uint8_t *end); + static bool _hasData(std::istringstream *stream); + static cpputils::Data _extractData(std::ostringstream *stream); + static std::istringstream _parseData(const uint8_t *data, size_t size); + static void _decodeArbitraryWords(std::istringstream *stream, std::ostringstream *decompressed); + static void _decodeIdenticalWords(std::istringstream *stream, std::ostringstream *decompressed); + }; + } +} + +#endif diff --git a/test/implementations/compressing/CompressingBlockStoreTest.cpp b/test/implementations/compressing/CompressingBlockStoreTest.cpp new file mode 100644 index 00000000..99a668e1 --- /dev/null +++ b/test/implementations/compressing/CompressingBlockStoreTest.cpp @@ -0,0 +1,28 @@ +#include "../../../implementations/compressing/CompressingBlockStore.h" +#include "../../../implementations/compressing/compressors/Gzip.h" +#include "../../../implementations/compressing/compressors/RunLengthEncoding.h" +#include "../../../implementations/testfake/FakeBlockStore.h" +#include "../../testutils/BlockStoreTest.h" +#include "google/gtest/gtest.h" + +using ::testing::Test; + +using blockstore::BlockStore; +using blockstore::compressing::CompressingBlockStore; +using blockstore::compressing::Gzip; +using blockstore::compressing::RunLengthEncoding; +using blockstore::testfake::FakeBlockStore; + +using cpputils::make_unique_ref; +using cpputils::unique_ref; + +template +class CompressingBlockStoreTestFixture: public BlockStoreTestFixture { +public: + unique_ref createBlockStore() override { + return make_unique_ref>(make_unique_ref()); + } +}; + +INSTANTIATE_TYPED_TEST_CASE_P(Compressing_Gzip, BlockStoreTest, CompressingBlockStoreTestFixture); +INSTANTIATE_TYPED_TEST_CASE_P(Compressing_RunLengthEncoding, BlockStoreTest, CompressingBlockStoreTestFixture); diff --git a/test/implementations/compressing/compressors/testutils/CompressorTest.cpp b/test/implementations/compressing/compressors/testutils/CompressorTest.cpp new file mode 100644 index 00000000..41fc4f26 --- /dev/null +++ b/test/implementations/compressing/compressors/testutils/CompressorTest.cpp @@ -0,0 +1,92 @@ +#include +#include "../../../../../implementations/compressing/compressors/Gzip.h" +#include "../../../../../implementations/compressing/compressors/RunLengthEncoding.h" +#include + +using namespace blockstore::compressing; +using cpputils::Data; +using cpputils::DataFixture; + +template +class CompressorTest: public ::testing::Test { +public: + void EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(const Data &data) { + Data compressed = Compressor::Compress(data); + Data decompressed = Compressor::Decompress(compressed.data(), compressed.size()); + EXPECT_EQ(data, decompressed); + } +}; + +TYPED_TEST_CASE_P(CompressorTest); + +TYPED_TEST_P(CompressorTest, Empty) { + Data empty(0); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(empty); +} + +TYPED_TEST_P(CompressorTest, ArbitraryData) { + Data fixture = DataFixture::generate(10240); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(fixture); +} + +TYPED_TEST_P(CompressorTest, Zeroes) { + Data zeroes(10240); + zeroes.FillWithZeroes(); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(zeroes); +} + +TYPED_TEST_P(CompressorTest, Runs) { + Data data(4096); + std::memset(data.dataOffset(0), 0xF2, 1024); + std::memset(data.dataOffset(1024), 0x00, 1024); + std::memset(data.dataOffset(2048), 0x01, 2048); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(data); +} + +TYPED_TEST_P(CompressorTest, RunsAndArbitrary) { + Data data(4096); + std::memset(data.dataOffset(0), 0xF2, 1024); + std::memcpy(data.dataOffset(1024), DataFixture::generate(1024).data(), 1024); + std::memset(data.dataOffset(2048), 0x01, 2048); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(data); +} + +TYPED_TEST_P(CompressorTest, LargeData) { + // this is larger than what fits into 16bit (16bit are for example used as run length indicator in RunLengthEncoding) + Data fixture = DataFixture::generate(200000); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(fixture); +} + +TYPED_TEST_P(CompressorTest, LargeRuns) { + // each run is larger than what fits into 16bit (16bit are for example used as run length indicator in RunLengthEncoding) + constexpr size_t RUN_SIZE = 200000; + Data data(3*RUN_SIZE); + std::memset(data.dataOffset(0), 0xF2, RUN_SIZE); + std::memset(data.dataOffset(RUN_SIZE), 0x00, RUN_SIZE); + std::memset(data.dataOffset(2*RUN_SIZE), 0x01, RUN_SIZE); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(data); +} + +TYPED_TEST_P(CompressorTest, LargeRunsAndArbitrary) { + // each run is larger than what fits into 16bit (16bit are for example used as run length indicator in RunLengthEncoding) + constexpr size_t RUN_SIZE = 200000; + Data data(3*RUN_SIZE); + std::memset(data.dataOffset(0), 0xF2, RUN_SIZE); + std::memcpy(data.dataOffset(RUN_SIZE), DataFixture::generate(RUN_SIZE).data(), RUN_SIZE); + std::memset(data.dataOffset(2*RUN_SIZE), 0x01, RUN_SIZE); + this->EXPECT_COMPRESS_AND_DECOMPRESS_IS_IDENTITY(data); +} + +REGISTER_TYPED_TEST_CASE_P(CompressorTest, + Empty, + ArbitraryData, + Zeroes, + Runs, + RunsAndArbitrary, + LargeData, + LargeRuns, + LargeRunsAndArbitrary +); + +INSTANTIATE_TYPED_TEST_CASE_P(Gzip, CompressorTest, Gzip); +INSTANTIATE_TYPED_TEST_CASE_P(RunLengthEncoding, CompressorTest, RunLengthEncoding);