Store own client id into the block next to the version number. This will be used to fix synchronization conflicts where the version number on one clients progresses slower than on another client, but synchronizes later.

This commit is contained in:
Sebastian Messmer 2016-06-21 17:02:32 -07:00
parent a5391a854d
commit 57af168cfd
3 changed files with 34 additions and 17 deletions

View File

@ -1,4 +1,5 @@
#include <fstream>
#include <cpp-utils/random/Random.h>
#include "KnownBlockVersions.h"
namespace bf = boost::filesystem;
@ -7,6 +8,7 @@ using std::pair;
using std::string;
using boost::optional;
using boost::none;
using cpputils::Random;
namespace blockstore {
namespace versioncounting {
@ -14,11 +16,12 @@ namespace versioncounting {
const string KnownBlockVersions::HEADER = "cryfs.integritydata.knownblockversions;0\0";
KnownBlockVersions::KnownBlockVersions(const bf::path &stateFilePath)
:_knownVersions(_loadStateFile(stateFilePath)), _stateFilePath(stateFilePath), _valid(true) {
:_knownVersions(), _stateFilePath(stateFilePath), _myClientId(0), _valid(true) {
_loadStateFile();
}
KnownBlockVersions::KnownBlockVersions(KnownBlockVersions &&rhs)
: _knownVersions(std::move(rhs._knownVersions)), _stateFilePath(std::move(rhs._stateFilePath)), _valid(true) {
: _knownVersions(std::move(rhs._knownVersions)), _stateFilePath(std::move(rhs._stateFilePath)), _myClientId(rhs._myClientId), _valid(true) {
rhs._valid = false;
}
@ -51,21 +54,24 @@ void KnownBlockVersions::updateVersion(const Key &key, uint64_t version) {
}
}
unordered_map<Key, uint64_t> KnownBlockVersions::_loadStateFile(const bf::path &stateFilePath) {
std::ifstream file(stateFilePath.native().c_str());
void KnownBlockVersions::_loadStateFile() {
std::ifstream file(_stateFilePath.native().c_str());
if (!file.good()) {
return unordered_map<Key, uint64_t>();
// File doesn't exist means we loaded empty state. Assign a random client id.
_myClientId = *reinterpret_cast<uint32_t*>(Random::PseudoRandom().getFixedSize<sizeof(uint32_t)>().data());
return;
}
_checkHeader(&file);
file.read((char*)&_myClientId, sizeof(_myClientId));
ASSERT(file.good(), "Error reading file");
unordered_map<Key, uint64_t> result;
_knownVersions.clear();
optional<pair<Key, uint64_t>> entry = _readEntry(&file);
while(none != entry) {
result.insert(*entry);
_knownVersions.insert(*entry);
entry = _readEntry(&file);
}
ASSERT(file.eof(), "Didn't read until end of file");
return result;
};
void KnownBlockVersions::_checkHeader(std::ifstream *file) {
@ -95,11 +101,16 @@ optional<pair<Key, uint64_t>> KnownBlockVersions::_readEntry(std::ifstream *file
void KnownBlockVersions::_saveStateFile() const {
std::ofstream file(_stateFilePath.native().c_str());
file.write(HEADER.c_str(), HEADER.size());
file.write((char*)&_myClientId, sizeof(_myClientId));
for (const auto &entry : _knownVersions) {
file.write((char*)entry.first.data(), entry.first.BINARY_LENGTH);
file.write((char*)&entry.second, sizeof(entry.second));
}
}
uint32_t KnownBlockVersions::myClientId() const {
return _myClientId;
}
}
}

View File

@ -21,14 +21,17 @@ namespace blockstore {
void updateVersion(const Key &key, uint64_t version);
uint32_t myClientId() const;
private:
std::unordered_map<Key, uint64_t> _knownVersions;
boost::filesystem::path _stateFilePath;
uint32_t _myClientId;
bool _valid;
static const std::string HEADER;
static std::unordered_map<Key, uint64_t> _loadStateFile(const boost::filesystem::path &stateFilePath);
void _loadStateFile();
static void _checkHeader(std::ifstream *file);
static boost::optional<std::pair<Key, uint64_t>> _readEntry(std::ifstream *file);
void _saveStateFile() const;

View File

@ -51,7 +51,7 @@ private:
bool _dataChanged;
void _storeToBaseBlock();
static cpputils::Data _prependHeaderToData(uint64_t version, cpputils::Data data);
static cpputils::Data _prependHeaderToData(uint32_t myClientId, uint64_t version, cpputils::Data data);
static void _checkFormatHeader(const cpputils::Data &data);
static uint64_t _readVersion(const cpputils::Data &data);
static bool _versionIsNondecreasing(const Key &key, uint64_t version, KnownBlockVersions *knownBlockVersions);
@ -59,7 +59,7 @@ private:
// This header is prepended to blocks to allow future versions to have compatibility.
static constexpr uint16_t FORMAT_VERSION_HEADER = 0;
static constexpr uint64_t VERSION_ZERO = 0;
static constexpr unsigned int HEADER_LENGTH = sizeof(FORMAT_VERSION_HEADER) + sizeof(VERSION_ZERO);
static constexpr unsigned int HEADER_LENGTH = sizeof(FORMAT_VERSION_HEADER) + sizeof(uint32_t) + sizeof(VERSION_ZERO);
std::mutex _mutex;
@ -68,7 +68,7 @@ private:
inline boost::optional<cpputils::unique_ref<VersionCountingBlock>> VersionCountingBlock::TryCreateNew(BlockStore *baseBlockStore, const Key &key, cpputils::Data data, KnownBlockVersions *knownBlockVersions) {
cpputils::Data dataWithHeader = _prependHeaderToData(VERSION_ZERO, std::move(data));
cpputils::Data dataWithHeader = _prependHeaderToData(knownBlockVersions->myClientId(), VERSION_ZERO, std::move(data));
auto baseBlock = baseBlockStore->tryCreate(key, dataWithHeader.copy()); // TODO Copy necessary?
if (baseBlock == boost::none) {
//TODO Test this code branch
@ -79,11 +79,12 @@ inline boost::optional<cpputils::unique_ref<VersionCountingBlock>> VersionCounti
return cpputils::make_unique_ref<VersionCountingBlock>(std::move(*baseBlock), std::move(dataWithHeader), VERSION_ZERO, knownBlockVersions);
}
inline cpputils::Data VersionCountingBlock::_prependHeaderToData(const uint64_t version, cpputils::Data data) {
static_assert(HEADER_LENGTH == sizeof(FORMAT_VERSION_HEADER) + sizeof(version), "Wrong header length");
inline cpputils::Data VersionCountingBlock::_prependHeaderToData(uint32_t myClientId, uint64_t version, cpputils::Data data) {
static_assert(HEADER_LENGTH == sizeof(FORMAT_VERSION_HEADER) + sizeof(myClientId) + sizeof(version), "Wrong header length");
cpputils::Data result(data.size() + HEADER_LENGTH);
std::memcpy(result.dataOffset(0), &FORMAT_VERSION_HEADER, sizeof(FORMAT_VERSION_HEADER));
std::memcpy(result.dataOffset(sizeof(FORMAT_VERSION_HEADER)), &version, sizeof(version));
std::memcpy(result.dataOffset(sizeof(FORMAT_VERSION_HEADER)), &myClientId, sizeof(myClientId));
std::memcpy(result.dataOffset(sizeof(FORMAT_VERSION_HEADER)+sizeof(myClientId)), &version, sizeof(version));
std::memcpy((uint8_t*)result.dataOffset(HEADER_LENGTH), data.data(), data.size());
return result;
}
@ -109,7 +110,7 @@ inline void VersionCountingBlock::_checkFormatHeader(const cpputils::Data &data)
inline uint64_t VersionCountingBlock::_readVersion(const cpputils::Data &data) {
uint64_t version;
std::memcpy(&version, data.dataOffset(sizeof(FORMAT_VERSION_HEADER)), sizeof(version));
std::memcpy(&version, data.dataOffset(sizeof(FORMAT_VERSION_HEADER) + sizeof(uint32_t)), sizeof(version));
return version;
}
@ -160,7 +161,9 @@ inline void VersionCountingBlock::resize(size_t newSize) {
inline void VersionCountingBlock::_storeToBaseBlock() {
if (_dataChanged) {
++_version;
std::memcpy(_dataWithHeader.dataOffset(sizeof(FORMAT_VERSION_HEADER)), &_version, sizeof(_version));
uint32_t myClientId = _knownBlockVersions->myClientId();
std::memcpy(_dataWithHeader.dataOffset(sizeof(FORMAT_VERSION_HEADER)), &myClientId, sizeof(myClientId));
std::memcpy(_dataWithHeader.dataOffset(sizeof(FORMAT_VERSION_HEADER) + sizeof(myClientId)), &_version, sizeof(_version));
_baseBlock->write(_dataWithHeader.data(), 0, _dataWithHeader.size());
_knownBlockVersions->updateVersion(key(), _version);
_dataChanged = false;