From a0d4548d4c180f6eb50863f2070b2027fb698e3e Mon Sep 17 00:00:00 2001 From: Sebastian Messmer Date: Wed, 28 Oct 2015 15:00:24 +0100 Subject: [PATCH] Fixed fork() issue with threads --- random/LoopThread.cpp | 18 +++++++++-- random/LoopThread.h | 12 +++++--- random/LoopThreadForkHandler.cpp | 53 ++++++++++++++++++++++++++++++++ random/LoopThreadForkHandler.h | 34 ++++++++++++++++++++ random/RandomGeneratorThread.cpp | 12 ++++++-- random/RandomGeneratorThread.h | 10 ++++-- 6 files changed, 128 insertions(+), 11 deletions(-) create mode 100644 random/LoopThreadForkHandler.cpp create mode 100644 random/LoopThreadForkHandler.h diff --git a/random/LoopThread.cpp b/random/LoopThread.cpp index d53df0ff..488bfe27 100644 --- a/random/LoopThread.cpp +++ b/random/LoopThread.cpp @@ -1,13 +1,18 @@ #include "LoopThread.h" #include "../logging/logging.h" +#include "LoopThreadForkHandler.h" using namespace cpputils::logging; +using std::function; namespace cpputils { - LoopThread::LoopThread(): _thread() {} + LoopThread::LoopThread(function loopIteration): _thread(), _loopIteration(loopIteration) { + LoopThreadForkHandler::singleton().add(this); + } LoopThread::~LoopThread() { + LoopThreadForkHandler::singleton().remove(this); stop(); } @@ -16,14 +21,21 @@ namespace cpputils { } void LoopThread::stop() { + asyncStop(); + waitUntilStopped(); + } + + void LoopThread::asyncStop() { _thread.interrupt(); + } + void LoopThread::waitUntilStopped() { _thread.join(); } void LoopThread::main() { try { while(true) { - loopIteration(); + _loopIteration(); } } catch (const boost::thread_interrupted &e) { //Do nothing, exit thread. @@ -35,4 +47,4 @@ namespace cpputils { LOG(ERROR) << "LoopThread crashed"; } } -} \ No newline at end of file +} diff --git a/random/LoopThread.h b/random/LoopThread.h index 06443b25..ad1d68e2 100644 --- a/random/LoopThread.h +++ b/random/LoopThread.h @@ -7,18 +7,22 @@ namespace cpputils { //TODO Test //TODO Move out of "random" folder into own library folder - class LoopThread { + // Has to be final, because otherwise there could be a race condition where LoopThreadForkHandler calls a LoopThread + // where the child class destructor already ran. + class LoopThread final { public: - LoopThread(); - virtual ~LoopThread(); + LoopThread(std::function loopIteration); + ~LoopThread(); void start(); void stop(); - virtual void loopIteration() = 0; + void asyncStop(); + void waitUntilStopped(); private: void main(); boost::thread _thread; + std::function _loopIteration; }; } diff --git a/random/LoopThreadForkHandler.cpp b/random/LoopThreadForkHandler.cpp new file mode 100644 index 00000000..40e6cda3 --- /dev/null +++ b/random/LoopThreadForkHandler.cpp @@ -0,0 +1,53 @@ +#include "LoopThreadForkHandler.h" +#include +#include "../logging/logging.h" +#include "../assert/assert.h" +#include "LoopThread.h" + +using namespace cpputils::logging; + +namespace cpputils { + LoopThreadForkHandler &LoopThreadForkHandler::singleton() { + static LoopThreadForkHandler singleton; + return singleton; + } + + LoopThreadForkHandler::LoopThreadForkHandler() { + //pthread_atfork(&LoopThreadForkHandler::_onBeforeFork, &LoopThreadForkHandler::_onAfterFork, &LoopThreadForkHandler::_onAfterFork); + pthread_atfork(nullptr, nullptr, &LoopThreadForkHandler::_onAfterFork); + } + + void LoopThreadForkHandler::add(LoopThread *thread) { + _runningThreads.push_back(thread); + } + + void LoopThreadForkHandler::remove(LoopThread *thread) { + auto found = std::find(_runningThreads.begin(), _runningThreads.end(), thread); + ASSERT(found != _runningThreads.end(), "Thread not found"); + _runningThreads.erase(found); + } + + void LoopThreadForkHandler::_onBeforeFork() { + singleton()._stopThreads(); + } + + void LoopThreadForkHandler::_stopThreads() { + for (LoopThread *thread : _runningThreads) { + thread->asyncStop(); + } + for (LoopThread *thread : _runningThreads) { + thread->waitUntilStopped(); + } + } + + void LoopThreadForkHandler::_onAfterFork() { + singleton()._startThreads(); + } + + void LoopThreadForkHandler::_startThreads() { + for (LoopThread *thread : _runningThreads) { + thread->start(); + } + } + +} diff --git a/random/LoopThreadForkHandler.h b/random/LoopThreadForkHandler.h new file mode 100644 index 00000000..3875205b --- /dev/null +++ b/random/LoopThreadForkHandler.h @@ -0,0 +1,34 @@ +#pragma once +#ifndef MESSMER_CPPUTILS_RANDOM_LOOPTHREADFORKHANDLER_H +#define MESSMER_CPPUTILS_RANDOM_LOOPTHREADFORKHANDLER_H + +#include +#include "../macros.h" + +namespace cpputils { + class LoopThread; + + // The fork() syscall only forks the main thread. + // This class takes care that LoopThreads are also run in the child process. + class LoopThreadForkHandler { + public: + static LoopThreadForkHandler &singleton(); + + void add(LoopThread *thread); + void remove(LoopThread *thread); + + private: + LoopThreadForkHandler(); + static void _onBeforeFork(); + static void _onAfterFork(); + + void _startThreads(); + void _stopThreads(); + + std::vector _runningThreads; + + DISALLOW_COPY_AND_ASSIGN(LoopThreadForkHandler); + }; +} + +#endif diff --git a/random/RandomGeneratorThread.cpp b/random/RandomGeneratorThread.cpp index 671fd692..8c322f53 100644 --- a/random/RandomGeneratorThread.cpp +++ b/random/RandomGeneratorThread.cpp @@ -3,11 +3,19 @@ namespace cpputils { RandomGeneratorThread::RandomGeneratorThread(ThreadsafeRandomDataBuffer *buffer, size_t minSize, size_t maxSize) - : _randomGenerator(), _buffer(buffer), _minSize(minSize), _maxSize(maxSize) { + : _randomGenerator(), + _buffer(buffer), + _minSize(minSize), + _maxSize(maxSize), + _thread(std::bind(&RandomGeneratorThread::_loopIteration, this)) { ASSERT(_maxSize >= _minSize, "Invalid parameters"); } - void RandomGeneratorThread::loopIteration() { + void RandomGeneratorThread::start() { + return _thread.start(); + } + + void RandomGeneratorThread::_loopIteration() { _buffer->waitUntilSizeIsLessThan(_minSize); size_t neededRandomDataSize = _maxSize - _buffer->size(); ASSERT(_maxSize > _buffer->size(), "This could theoretically fail if another thread refilled the buffer. But we should be the only refilling thread."); diff --git a/random/RandomGeneratorThread.h b/random/RandomGeneratorThread.h index 4b358a63..4f5d1fdc 100644 --- a/random/RandomGeneratorThread.h +++ b/random/RandomGeneratorThread.h @@ -8,12 +8,14 @@ namespace cpputils { //TODO Test - class RandomGeneratorThread: public LoopThread { + class RandomGeneratorThread { public: RandomGeneratorThread(ThreadsafeRandomDataBuffer *buffer, size_t minSize, size_t maxSize); - void loopIteration() override; + + void start(); private: + void _loopIteration(); Data _generateRandomData(size_t size); CryptoPP::AutoSeededRandomPool _randomGenerator; @@ -21,6 +23,10 @@ namespace cpputils { size_t _minSize; size_t _maxSize; + //This has to be the last member, because it has to be destructed first - otherwise the thread could still be + //running while the RandomGeneratorThread object is invalid. + LoopThread _thread; + DISALLOW_COPY_AND_ASSIGN(RandomGeneratorThread); }; }