Fixed fork() issue with threads

This commit is contained in:
Sebastian Messmer 2015-10-28 15:00:24 +01:00
parent 3e01c56ad4
commit a0d4548d4c
6 changed files with 128 additions and 11 deletions

View File

@ -1,13 +1,18 @@
#include "LoopThread.h" #include "LoopThread.h"
#include "../logging/logging.h" #include "../logging/logging.h"
#include "LoopThreadForkHandler.h"
using namespace cpputils::logging; using namespace cpputils::logging;
using std::function;
namespace cpputils { namespace cpputils {
LoopThread::LoopThread(): _thread() {} LoopThread::LoopThread(function<void()> loopIteration): _thread(), _loopIteration(loopIteration) {
LoopThreadForkHandler::singleton().add(this);
}
LoopThread::~LoopThread() { LoopThread::~LoopThread() {
LoopThreadForkHandler::singleton().remove(this);
stop(); stop();
} }
@ -16,14 +21,21 @@ namespace cpputils {
} }
void LoopThread::stop() { void LoopThread::stop() {
asyncStop();
waitUntilStopped();
}
void LoopThread::asyncStop() {
_thread.interrupt(); _thread.interrupt();
}
void LoopThread::waitUntilStopped() {
_thread.join(); _thread.join();
} }
void LoopThread::main() { void LoopThread::main() {
try { try {
while(true) { while(true) {
loopIteration(); _loopIteration();
} }
} catch (const boost::thread_interrupted &e) { } catch (const boost::thread_interrupted &e) {
//Do nothing, exit thread. //Do nothing, exit thread.

View File

@ -7,18 +7,22 @@
namespace cpputils { namespace cpputils {
//TODO Test //TODO Test
//TODO Move out of "random" folder into own library folder //TODO Move out of "random" folder into own library folder
class LoopThread { // Has to be final, because otherwise there could be a race condition where LoopThreadForkHandler calls a LoopThread
// where the child class destructor already ran.
class LoopThread final {
public: public:
LoopThread(); LoopThread(std::function<void()> loopIteration);
virtual ~LoopThread(); ~LoopThread();
void start(); void start();
void stop(); void stop();
virtual void loopIteration() = 0; void asyncStop();
void waitUntilStopped();
private: private:
void main(); void main();
boost::thread _thread; boost::thread _thread;
std::function<void()> _loopIteration;
}; };
} }

View File

@ -0,0 +1,53 @@
#include "LoopThreadForkHandler.h"
#include <thread>
#include "../logging/logging.h"
#include "../assert/assert.h"
#include "LoopThread.h"
using namespace cpputils::logging;
namespace cpputils {
LoopThreadForkHandler &LoopThreadForkHandler::singleton() {
static LoopThreadForkHandler singleton;
return singleton;
}
LoopThreadForkHandler::LoopThreadForkHandler() {
//pthread_atfork(&LoopThreadForkHandler::_onBeforeFork, &LoopThreadForkHandler::_onAfterFork, &LoopThreadForkHandler::_onAfterFork);
pthread_atfork(nullptr, nullptr, &LoopThreadForkHandler::_onAfterFork);
}
void LoopThreadForkHandler::add(LoopThread *thread) {
_runningThreads.push_back(thread);
}
void LoopThreadForkHandler::remove(LoopThread *thread) {
auto found = std::find(_runningThreads.begin(), _runningThreads.end(), thread);
ASSERT(found != _runningThreads.end(), "Thread not found");
_runningThreads.erase(found);
}
void LoopThreadForkHandler::_onBeforeFork() {
singleton()._stopThreads();
}
void LoopThreadForkHandler::_stopThreads() {
for (LoopThread *thread : _runningThreads) {
thread->asyncStop();
}
for (LoopThread *thread : _runningThreads) {
thread->waitUntilStopped();
}
}
void LoopThreadForkHandler::_onAfterFork() {
singleton()._startThreads();
}
void LoopThreadForkHandler::_startThreads() {
for (LoopThread *thread : _runningThreads) {
thread->start();
}
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#ifndef MESSMER_CPPUTILS_RANDOM_LOOPTHREADFORKHANDLER_H
#define MESSMER_CPPUTILS_RANDOM_LOOPTHREADFORKHANDLER_H
#include <vector>
#include "../macros.h"
namespace cpputils {
class LoopThread;
// The fork() syscall only forks the main thread.
// This class takes care that LoopThreads are also run in the child process.
class LoopThreadForkHandler {
public:
static LoopThreadForkHandler &singleton();
void add(LoopThread *thread);
void remove(LoopThread *thread);
private:
LoopThreadForkHandler();
static void _onBeforeFork();
static void _onAfterFork();
void _startThreads();
void _stopThreads();
std::vector<LoopThread*> _runningThreads;
DISALLOW_COPY_AND_ASSIGN(LoopThreadForkHandler);
};
}
#endif

View File

@ -3,11 +3,19 @@
namespace cpputils { namespace cpputils {
RandomGeneratorThread::RandomGeneratorThread(ThreadsafeRandomDataBuffer *buffer, size_t minSize, size_t maxSize) RandomGeneratorThread::RandomGeneratorThread(ThreadsafeRandomDataBuffer *buffer, size_t minSize, size_t maxSize)
: _randomGenerator(), _buffer(buffer), _minSize(minSize), _maxSize(maxSize) { : _randomGenerator(),
_buffer(buffer),
_minSize(minSize),
_maxSize(maxSize),
_thread(std::bind(&RandomGeneratorThread::_loopIteration, this)) {
ASSERT(_maxSize >= _minSize, "Invalid parameters"); ASSERT(_maxSize >= _minSize, "Invalid parameters");
} }
void RandomGeneratorThread::loopIteration() { void RandomGeneratorThread::start() {
return _thread.start();
}
void RandomGeneratorThread::_loopIteration() {
_buffer->waitUntilSizeIsLessThan(_minSize); _buffer->waitUntilSizeIsLessThan(_minSize);
size_t neededRandomDataSize = _maxSize - _buffer->size(); size_t neededRandomDataSize = _maxSize - _buffer->size();
ASSERT(_maxSize > _buffer->size(), "This could theoretically fail if another thread refilled the buffer. But we should be the only refilling thread."); ASSERT(_maxSize > _buffer->size(), "This could theoretically fail if another thread refilled the buffer. But we should be the only refilling thread.");

View File

@ -8,12 +8,14 @@
namespace cpputils { namespace cpputils {
//TODO Test //TODO Test
class RandomGeneratorThread: public LoopThread { class RandomGeneratorThread {
public: public:
RandomGeneratorThread(ThreadsafeRandomDataBuffer *buffer, size_t minSize, size_t maxSize); RandomGeneratorThread(ThreadsafeRandomDataBuffer *buffer, size_t minSize, size_t maxSize);
void loopIteration() override;
void start();
private: private:
void _loopIteration();
Data _generateRandomData(size_t size); Data _generateRandomData(size_t size);
CryptoPP::AutoSeededRandomPool _randomGenerator; CryptoPP::AutoSeededRandomPool _randomGenerator;
@ -21,6 +23,10 @@ namespace cpputils {
size_t _minSize; size_t _minSize;
size_t _maxSize; size_t _maxSize;
//This has to be the last member, because it has to be destructed first - otherwise the thread could still be
//running while the RandomGeneratorThread object is invalid.
LoopThread _thread;
DISALLOW_COPY_AND_ASSIGN(RandomGeneratorThread); DISALLOW_COPY_AND_ASSIGN(RandomGeneratorThread);
}; };
} }