Merge parallelaccessstore repository
This commit is contained in:
commit
4202e6be89
@ -34,5 +34,6 @@ script:
|
||||
- make -j2
|
||||
- ./test/cpp-utils-test
|
||||
- ../run_with_fuse.sh ./test/fspp-test
|
||||
- ./test/parallelaccessstore-test
|
||||
after_script:
|
||||
- rm run_with_fuse.sh
|
||||
|
20
src/parallelaccessstore/ParallelAccessBaseStore.h
Normal file
20
src/parallelaccessstore/ParallelAccessBaseStore.h
Normal file
@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_PARALLELACCESSSTORE_PARALLELACCESSBASESTORE_H_
|
||||
#define MESSMER_PARALLELACCESSSTORE_PARALLELACCESSBASESTORE_H_
|
||||
|
||||
#include <cpp-utils/pointer/unique_ref.h>
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
namespace parallelaccessstore {
|
||||
|
||||
template<class Resource, class Key>
|
||||
class ParallelAccessBaseStore {
|
||||
public:
|
||||
virtual ~ParallelAccessBaseStore() {}
|
||||
virtual boost::optional<cpputils::unique_ref<Resource>> loadFromBaseStore(const Key &key) = 0;
|
||||
virtual void removeFromBaseStore(cpputils::unique_ref<Resource> block) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
204
src/parallelaccessstore/ParallelAccessStore.h
Normal file
204
src/parallelaccessstore/ParallelAccessStore.h
Normal file
@ -0,0 +1,204 @@
|
||||
#pragma once
|
||||
#ifndef MESSMER_PARALLELACCESSSTORE_PARALLELACCESSSTORE_H_
|
||||
#define MESSMER_PARALLELACCESSSTORE_PARALLELACCESSSTORE_H_
|
||||
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#include <map>
|
||||
#include <unordered_map>
|
||||
#include <future>
|
||||
#include <cassert>
|
||||
#include <type_traits>
|
||||
#include <cpp-utils/macros.h>
|
||||
#include "ParallelAccessBaseStore.h"
|
||||
#include <cpp-utils/assert/assert.h>
|
||||
|
||||
|
||||
//TODO Refactor
|
||||
//TODO Test cases
|
||||
|
||||
namespace parallelaccessstore {
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
class ParallelAccessStore final {
|
||||
public:
|
||||
explicit ParallelAccessStore(cpputils::unique_ref<ParallelAccessBaseStore<Resource, Key>> baseStore);
|
||||
~ParallelAccessStore() {
|
||||
ASSERT(_openResources.size() == 0, "Still resources open when trying to destruct");
|
||||
ASSERT(_resourcesToRemove.size() == 0, "Still resources to remove when trying to destruct");
|
||||
};
|
||||
|
||||
class ResourceRefBase {
|
||||
public:
|
||||
//TODO Better way to initialize
|
||||
ResourceRefBase(): _parallelAccessStore(nullptr), _key(Key::Null()) {}
|
||||
void init(ParallelAccessStore *parallelAccessStore, const Key &key) {
|
||||
_parallelAccessStore = parallelAccessStore;
|
||||
_key = key;
|
||||
}
|
||||
virtual ~ResourceRefBase() {
|
||||
_parallelAccessStore->release(_key);
|
||||
}
|
||||
private:
|
||||
ParallelAccessStore *_parallelAccessStore;
|
||||
//TODO We're storing Key twice (here and in the base resource). Rather use getKey() on the base resource if possible somehow.
|
||||
Key _key;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ResourceRefBase);
|
||||
};
|
||||
|
||||
bool isOpened(const Key &key) const;
|
||||
cpputils::unique_ref<ResourceRef> add(const Key &key, cpputils::unique_ref<Resource> resource);
|
||||
template<class ActualResourceRef>
|
||||
cpputils::unique_ref<ActualResourceRef> add(const Key &key, cpputils::unique_ref<Resource> resource, std::function<cpputils::unique_ref<ActualResourceRef>(Resource*)> createResourceRef);
|
||||
boost::optional<cpputils::unique_ref<ResourceRef>> load(const Key &key);
|
||||
boost::optional<cpputils::unique_ref<ResourceRef>> load(const Key &key, std::function<cpputils::unique_ref<ResourceRef>(Resource*)> createResourceRef);
|
||||
void remove(const Key &key, cpputils::unique_ref<ResourceRef> block);
|
||||
|
||||
private:
|
||||
class OpenResource final {
|
||||
public:
|
||||
OpenResource(cpputils::unique_ref<Resource> resource): _resource(std::move(resource)), _refCount(0) {}
|
||||
OpenResource(OpenResource &&rhs) = default;
|
||||
|
||||
Resource *getReference() {
|
||||
++_refCount;
|
||||
return _resource.get();
|
||||
}
|
||||
|
||||
void releaseReference() {
|
||||
--_refCount;
|
||||
}
|
||||
|
||||
bool refCountIsZero() const {
|
||||
return 0 == _refCount;
|
||||
}
|
||||
|
||||
cpputils::unique_ref<Resource> moveResourceOut() {
|
||||
return std::move(_resource);
|
||||
}
|
||||
private:
|
||||
cpputils::unique_ref<Resource> _resource;
|
||||
uint32_t _refCount;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(OpenResource);
|
||||
};
|
||||
|
||||
mutable std::mutex _mutex;
|
||||
cpputils::unique_ref<ParallelAccessBaseStore<Resource, Key>> _baseStore;
|
||||
|
||||
std::unordered_map<Key, OpenResource> _openResources;
|
||||
std::map<Key, std::promise<cpputils::unique_ref<Resource>>> _resourcesToRemove;
|
||||
|
||||
template<class ActualResourceRef>
|
||||
cpputils::unique_ref<ActualResourceRef> _add(const Key &key, cpputils::unique_ref<Resource> resource, std::function<cpputils::unique_ref<ActualResourceRef>(Resource*)> createResourceRef);
|
||||
|
||||
void release(const Key &key);
|
||||
friend class CachedResource;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ParallelAccessStore);
|
||||
};
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
ParallelAccessStore<Resource, ResourceRef, Key>::ParallelAccessStore(cpputils::unique_ref<ParallelAccessBaseStore<Resource, Key>> baseStore)
|
||||
: _mutex(),
|
||||
_baseStore(std::move(baseStore)),
|
||||
_openResources(),
|
||||
_resourcesToRemove() {
|
||||
static_assert(std::is_base_of<ResourceRefBase, ResourceRef>::value, "ResourceRef must inherit from ResourceRefBase");
|
||||
}
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
bool ParallelAccessStore<Resource, ResourceRef, Key>::isOpened(const Key &key) const {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
return _openResources.find(key) != _openResources.end();
|
||||
};
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
cpputils::unique_ref<ResourceRef> ParallelAccessStore<Resource, ResourceRef, Key>::add(const Key &key, cpputils::unique_ref<Resource> resource) {
|
||||
return add<ResourceRef>(key, std::move(resource), [] (Resource *resource) {
|
||||
return cpputils::make_unique_ref<ResourceRef>(resource);
|
||||
});
|
||||
}
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
template<class ActualResourceRef>
|
||||
cpputils::unique_ref<ActualResourceRef> ParallelAccessStore<Resource, ResourceRef, Key>::add(const Key &key, cpputils::unique_ref<Resource> resource, std::function<cpputils::unique_ref<ActualResourceRef>(Resource*)> createResourceRef) {
|
||||
static_assert(std::is_base_of<ResourceRef, ActualResourceRef>::value, "Wrong ResourceRef type");
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
return _add<ActualResourceRef>(key, std::move(resource), createResourceRef);
|
||||
}
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
template<class ActualResourceRef>
|
||||
cpputils::unique_ref<ActualResourceRef> ParallelAccessStore<Resource, ResourceRef, Key>::_add(const Key &key, cpputils::unique_ref<Resource> resource, std::function<cpputils::unique_ref<ActualResourceRef>(Resource*)> createResourceRef) {
|
||||
static_assert(std::is_base_of<ResourceRef, ActualResourceRef>::value, "Wrong ResourceRef type");
|
||||
auto insertResult = _openResources.emplace(key, std::move(resource));
|
||||
ASSERT(true == insertResult.second, "Inserting failed. Already exists.");
|
||||
auto resourceRef = createResourceRef(insertResult.first->second.getReference());
|
||||
resourceRef->init(this, key);
|
||||
return resourceRef;
|
||||
}
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
boost::optional<cpputils::unique_ref<ResourceRef>> ParallelAccessStore<Resource, ResourceRef, Key>::load(const Key &key) {
|
||||
return load(key, [] (Resource *res) {
|
||||
return cpputils::make_unique_ref<ResourceRef>(res);
|
||||
});
|
||||
};
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
boost::optional<cpputils::unique_ref<ResourceRef>> ParallelAccessStore<Resource, ResourceRef, Key>::load(const Key &key, std::function<cpputils::unique_ref<ResourceRef>(Resource*)> createResourceRef) {
|
||||
//TODO This lock doesn't allow loading different blocks in parallel. Can we only lock the requested key?
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto found = _openResources.find(key);
|
||||
if (found == _openResources.end()) {
|
||||
auto resource = _baseStore->loadFromBaseStore(key);
|
||||
if (resource == boost::none) {
|
||||
return boost::none;
|
||||
}
|
||||
return _add(key, std::move(*resource), createResourceRef);
|
||||
} else {
|
||||
auto resourceRef = createResourceRef(found->second.getReference());
|
||||
resourceRef->init(this, key);
|
||||
return std::move(resourceRef);
|
||||
}
|
||||
}
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
void ParallelAccessStore<Resource, ResourceRef, Key>::remove(const Key &key, cpputils::unique_ref<ResourceRef> resource) {
|
||||
std::future<cpputils::unique_ref<Resource>> resourceToRemoveFuture;
|
||||
{
|
||||
std::lock_guard <std::mutex> lock(_mutex); // TODO Lock needed for _resourcesToRemove?
|
||||
auto insertResult = _resourcesToRemove.emplace(key, std::promise < cpputils::unique_ref < Resource >> ());
|
||||
ASSERT(true == insertResult.second, "Inserting failed");
|
||||
resourceToRemoveFuture = insertResult.first->second.get_future();
|
||||
}
|
||||
cpputils::destruct(std::move(resource));
|
||||
//Wait for last resource user to release it
|
||||
auto resourceToRemove = resourceToRemoveFuture.get();
|
||||
|
||||
std::lock_guard<std::mutex> lock(_mutex); // TODO Just added this as a precaution on a whim, but I seriously need to rethink locking here.
|
||||
_resourcesToRemove.erase(key); //TODO Is this erase causing a race condition?
|
||||
|
||||
_baseStore->removeFromBaseStore(std::move(resourceToRemove));
|
||||
}
|
||||
|
||||
template<class Resource, class ResourceRef, class Key>
|
||||
void ParallelAccessStore<Resource, ResourceRef, Key>::release(const Key &key) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto found = _openResources.find(key);
|
||||
ASSERT(found != _openResources.end(), "Didn't find key");
|
||||
found->second.releaseReference();
|
||||
if (found->second.refCountIsZero()) {
|
||||
auto foundToRemove = _resourcesToRemove.find(key);
|
||||
if (foundToRemove != _resourcesToRemove.end()) {
|
||||
foundToRemove->second.set_value(found->second.moveResourceOut());
|
||||
}
|
||||
_openResources.erase(found);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -5,4 +5,5 @@ if (BUILD_TESTING)
|
||||
|
||||
add_subdirectory(cpp-utils)
|
||||
add_subdirectory(fspp)
|
||||
add_subdirectory(parallelaccessstore)
|
||||
endif(BUILD_TESTING)
|
||||
|
13
test/parallelaccessstore/CMakeLists.txt
Normal file
13
test/parallelaccessstore/CMakeLists.txt
Normal file
@ -0,0 +1,13 @@
|
||||
project (parallelaccessstore-test)
|
||||
|
||||
set(SOURCES
|
||||
ParallelAccessBaseStoreTest.cpp
|
||||
DummyTest.cpp
|
||||
)
|
||||
|
||||
add_executable(${PROJECT_NAME} ${SOURCES})
|
||||
target_link_libraries(${PROJECT_NAME} googletest)
|
||||
add_test(${PROJECT_NAME} ${PROJECT_NAME})
|
||||
|
||||
target_enable_style_warnings(${PROJECT_NAME})
|
||||
target_activate_cpp14(${PROJECT_NAME})
|
5
test/parallelaccessstore/DummyTest.cpp
Normal file
5
test/parallelaccessstore/DummyTest.cpp
Normal file
@ -0,0 +1,5 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
TEST(Dummy, DummyTest) {
|
||||
}
|
||||
|
3
test/parallelaccessstore/ParallelAccessBaseStoreTest.cpp
Normal file
3
test/parallelaccessstore/ParallelAccessBaseStoreTest.cpp
Normal file
@ -0,0 +1,3 @@
|
||||
#include "parallelaccessstore/ParallelAccessBaseStore.h"
|
||||
|
||||
// Test that ParallelAccessBaseStore.h can be included without errors
|
Loading…
x
Reference in New Issue
Block a user