Added new LoopThread implementation

This commit is contained in:
Sebastian Messmer 2015-11-07 00:00:25 -08:00
parent 5575509594
commit 236ea8d243
9 changed files with 153 additions and 148 deletions

View File

@ -6,6 +6,7 @@
#include <condition_variable>
//TODO Test
//TODO Merge lock folder with thread folder
namespace cpputils {
// Like a condition variable, but without spurious wakeups.

View File

@ -1,50 +0,0 @@
#include "LoopThread.h"
#include "../logging/logging.h"
#include "LoopThreadForkHandler.h"
using namespace cpputils::logging;
using std::function;
namespace cpputils {
LoopThread::LoopThread(function<void()> loopIteration): _thread(), _loopIteration(loopIteration) {
LoopThreadForkHandler::singleton().add(this);
}
LoopThread::~LoopThread() {
LoopThreadForkHandler::singleton().remove(this);
stop();
}
void LoopThread::start() {
_thread = boost::thread(std::bind(&LoopThread::main, this));
}
void LoopThread::stop() {
asyncStop();
waitUntilStopped();
}
void LoopThread::asyncStop() {
_thread.interrupt();
}
void LoopThread::waitUntilStopped() {
_thread.join();
}
void LoopThread::main() {
try {
while(true) {
_loopIteration();
}
} catch (const boost::thread_interrupted &e) {
//Do nothing, exit thread.
} catch (const std::exception &e) {
//TODO Think about logging
LOG(ERROR) << "LoopThread crashed: " << e.what();
} catch (...) {
//TODO Think about logging
LOG(ERROR) << "LoopThread crashed";
}
}
}

View File

@ -1,54 +0,0 @@
#include "LoopThreadForkHandler.h"
#include <thread>
#include "../logging/logging.h"
#include "../assert/assert.h"
#include "LoopThread.h"
using namespace cpputils::logging;
namespace cpputils {
LoopThreadForkHandler &LoopThreadForkHandler::singleton() {
static LoopThreadForkHandler singleton;
return singleton;
}
LoopThreadForkHandler::LoopThreadForkHandler() {
//Stopping the thread before fork() (and then also restarting it in the parent thread after fork()) is important,
//because as a running thread it might hold locks or condition variables that won't play well when forked.
pthread_atfork(&LoopThreadForkHandler::_onBeforeFork, &LoopThreadForkHandler::_onAfterFork, &LoopThreadForkHandler::_onAfterFork);
}
void LoopThreadForkHandler::add(LoopThread *thread) {
_runningThreads.push_back(thread);
}
void LoopThreadForkHandler::remove(LoopThread *thread) {
auto found = std::find(_runningThreads.begin(), _runningThreads.end(), thread);
ASSERT(found != _runningThreads.end(), "Thread not found");
_runningThreads.erase(found);
}
void LoopThreadForkHandler::_onBeforeFork() {
singleton()._stopThreads();
}
void LoopThreadForkHandler::_stopThreads() {
for (LoopThread *thread : _runningThreads) {
thread->asyncStop();
}
for (LoopThread *thread : _runningThreads) {
thread->waitUntilStopped();
}
}
void LoopThreadForkHandler::_onAfterFork() {
singleton()._startThreads();
}
void LoopThreadForkHandler::_startThreads() {
for (LoopThread *thread : _runningThreads) {
thread->start();
}
}
}

View File

@ -1,34 +0,0 @@
#pragma once
#ifndef MESSMER_CPPUTILS_RANDOM_LOOPTHREADFORKHANDLER_H
#define MESSMER_CPPUTILS_RANDOM_LOOPTHREADFORKHANDLER_H
#include <vector>
#include "../macros.h"
namespace cpputils {
class LoopThread;
// The fork() syscall only forks the main thread.
// This class takes care that LoopThreads are also run in the child process.
class LoopThreadForkHandler {
public:
static LoopThreadForkHandler &singleton();
void add(LoopThread *thread);
void remove(LoopThread *thread);
private:
LoopThreadForkHandler();
static void _onBeforeFork();
static void _onAfterFork();
void _startThreads();
void _stopThreads();
std::vector<LoopThread*> _runningThreads;
DISALLOW_COPY_AND_ASSIGN(LoopThreadForkHandler);
};
}
#endif

View File

@ -2,7 +2,7 @@
#ifndef MESSMER_CPPUTILS_RANDOM_RANDOMGENERATORTHREAD_H
#define MESSMER_CPPUTILS_RANDOM_RANDOMGENERATORTHREAD_H
#include "LoopThread.h"
#include "../thread/LoopThread.h"
#include "ThreadsafeRandomDataBuffer.h"
#include <cryptopp/cryptopp/osrng.h>

29
thread/LoopThread.cpp Normal file
View File

@ -0,0 +1,29 @@
#include "LoopThread.h"
#include "../logging/logging.h"
using std::function;
using boost::none;
namespace cpputils {
LoopThread::LoopThread(function<void()> loopIteration): _loopIteration(loopIteration), _runningHandle(none) {
}
LoopThread::~LoopThread() {
if (_runningHandle != none) {
stop();
}
}
void LoopThread::start() {
_runningHandle = ThreadSystem::singleton().start(_loopIteration);
}
void LoopThread::stop() {
if (_runningHandle == none) {
throw std::runtime_error("LoopThread is not running");
}
ThreadSystem::singleton().stop(*_runningHandle);
_runningHandle = none;
}
}

View File

@ -1,12 +1,12 @@
#pragma once
#ifndef MESSMER_CPPUTILS_RANDOM_LOOPTHREAD_H
#define MESSMER_CPPUTILS_RANDOM_LOOPTHREAD_H
#ifndef MESSMER_CPPUTILS_THREAD_LOOPTHREAD_H
#define MESSMER_CPPUTILS_THREAD_LOOPTHREAD_H
#include <boost/thread.hpp>
#include "ThreadSystem.h"
#include <boost/optional.hpp>
namespace cpputils {
//TODO Test
//TODO Move out of "random" folder into own library folder
//TODO Test that fork() doesn't destroy anything (e.g. no deadlock on stop() because thread is not running anymore)
// Has to be final, because otherwise there could be a race condition where LoopThreadForkHandler calls a LoopThread
@ -18,13 +18,9 @@ namespace cpputils {
void start();
void stop();
void asyncStop();
void waitUntilStopped();
private:
void main();
boost::thread _thread;
std::function<void()> _loopIteration;
boost::optional<ThreadSystem::Handle> _runningHandle;
};
}

73
thread/ThreadSystem.cpp Normal file
View File

@ -0,0 +1,73 @@
#include "ThreadSystem.h"
#include "../logging/logging.h"
using std::function;
using namespace cpputils::logging;
namespace cpputils {
ThreadSystem &ThreadSystem::singleton() {
static ThreadSystem system;
return system;
}
ThreadSystem::ThreadSystem(): _runningThreads() {
//Stopping the thread before fork() (and then also restarting it in the parent thread after fork()) is important,
//because as a running thread it might hold locks or condition variables that won't play well when forked.
pthread_atfork(&ThreadSystem::_onBeforeFork, &ThreadSystem::_onAfterFork, &ThreadSystem::_onAfterFork);
}
ThreadSystem::Handle ThreadSystem::start(function<void()> loopIteration) {
auto thread = _startThread(loopIteration);
_runningThreads.push_back(RunningThread{loopIteration, std::move(thread)});
return std::prev(_runningThreads.end());
}
void ThreadSystem::stop(Handle handle) {
handle->thread.interrupt();
handle->thread.join();
_runningThreads.erase(handle);
}
void ThreadSystem::_onBeforeFork() {
singleton()._stopAllThreadsForRestart();
}
void ThreadSystem::_onAfterFork() {
singleton()._restartAllThreads();
}
void ThreadSystem::_stopAllThreadsForRestart() {
for (RunningThread &thread : _runningThreads) {
thread.thread.interrupt();
}
for (RunningThread &thread : _runningThreads) {
thread.thread.join();
}
}
void ThreadSystem::_restartAllThreads() {
for (RunningThread &thread : _runningThreads) {
thread.thread = _startThread(thread.loopIteration);
}
}
boost::thread ThreadSystem::_startThread(function<void()> loopIteration) {
return boost::thread(std::bind(&ThreadSystem::_runThread, loopIteration));
}
void ThreadSystem::_runThread(function<void()> loopIteration) {
try {
while(true) {
boost::this_thread::interruption_point();
loopIteration(); // This might also be interrupted.
}
} catch (const boost::thread_interrupted &e) {
//Do nothing, exit thread.
} catch (const std::exception &e) {
LOG(ERROR) << "LoopThread crashed: " << e.what();
} catch (...) {
LOG(ERROR) << "LoopThread crashed";
}
}
}

44
thread/ThreadSystem.h Normal file
View File

@ -0,0 +1,44 @@
#pragma once
#ifndef MESSMER_CPPUTILS_THREAD_THREADSYSTEM_H
#define MESSMER_CPPUTILS_THREAD_THREADSYSTEM_H
#include "../macros.h"
#include <boost/thread.hpp>
#include <list>
#include <functional>
namespace cpputils {
//TODO Test
class ThreadSystem final {
private:
struct RunningThread {
std::function<void()> loopIteration;
boost::thread thread; // boost::thread because we need it to be interruptible.
};
public:
using Handle = std::list<RunningThread>::iterator;
static ThreadSystem &singleton();
Handle start(std::function<void()> loopIteration);
void stop(Handle handle);
private:
ThreadSystem();
static void _runThread(std::function<void()> loopIteration);
static void _onBeforeFork();
static void _onAfterFork();
void _stopAllThreadsForRestart();
void _restartAllThreads();
boost::thread _startThread(std::function<void()> loopIteration);
std::list<RunningThread> _runningThreads; // std::list, because we give out iterators as handles
DISALLOW_COPY_AND_ASSIGN(ThreadSystem);
};
}
#endif