Allow terminating LoopThreads from within

This commit is contained in:
Sebastian Messmer 2015-11-12 13:07:20 -08:00
parent b92a87ab72
commit 9cea5d9090
6 changed files with 19 additions and 14 deletions

View File

@ -15,12 +15,13 @@ namespace cpputils {
return _thread.start(); return _thread.start();
} }
void RandomGeneratorThread::_loopIteration() { bool RandomGeneratorThread::_loopIteration() {
_buffer->waitUntilSizeIsLessThan(_minSize); _buffer->waitUntilSizeIsLessThan(_minSize);
size_t neededRandomDataSize = _maxSize - _buffer->size(); size_t neededRandomDataSize = _maxSize - _buffer->size();
ASSERT(_maxSize > _buffer->size(), "This could theoretically fail if another thread refilled the buffer. But we should be the only refilling thread."); ASSERT(_maxSize > _buffer->size(), "This could theoretically fail if another thread refilled the buffer. But we should be the only refilling thread.");
Data randomData = _generateRandomData(neededRandomDataSize); Data randomData = _generateRandomData(neededRandomDataSize);
_buffer->add(std::move(randomData)); _buffer->add(std::move(randomData));
return true; // Run another iteration (don't terminate thread)
} }
Data RandomGeneratorThread::_generateRandomData(size_t size) { Data RandomGeneratorThread::_generateRandomData(size_t size) {

View File

@ -15,7 +15,7 @@ namespace cpputils {
void start(); void start();
private: private:
void _loopIteration(); bool _loopIteration();
Data _generateRandomData(size_t size); Data _generateRandomData(size_t size);
CryptoPP::AutoSeededRandomPool _randomGenerator; CryptoPP::AutoSeededRandomPool _randomGenerator;

View File

@ -6,7 +6,7 @@ using boost::none;
namespace cpputils { namespace cpputils {
LoopThread::LoopThread(function<void()> loopIteration): _loopIteration(loopIteration), _runningHandle(none) { LoopThread::LoopThread(function<bool()> loopIteration): _loopIteration(loopIteration), _runningHandle(none) {
} }
LoopThread::~LoopThread() { LoopThread::~LoopThread() {

View File

@ -13,13 +13,14 @@ namespace cpputils {
// where the child class destructor already ran. // where the child class destructor already ran.
class LoopThread final { class LoopThread final {
public: public:
LoopThread(std::function<void()> loopIteration); // The loopIteration callback returns true, if more iterations should be run, and false, if the thread should be terminated.
LoopThread(std::function<bool()> loopIteration);
~LoopThread(); ~LoopThread();
void start(); void start();
void stop(); void stop();
private: private:
std::function<void()> _loopIteration; std::function<bool()> _loopIteration;
boost::optional<ThreadSystem::Handle> _runningHandle; boost::optional<ThreadSystem::Handle> _runningHandle;
}; };
} }

View File

@ -17,7 +17,7 @@ namespace cpputils {
pthread_atfork(&ThreadSystem::_onBeforeFork, &ThreadSystem::_onAfterFork, &ThreadSystem::_onAfterFork); pthread_atfork(&ThreadSystem::_onBeforeFork, &ThreadSystem::_onAfterFork, &ThreadSystem::_onAfterFork);
} }
ThreadSystem::Handle ThreadSystem::start(function<void()> loopIteration) { ThreadSystem::Handle ThreadSystem::start(function<bool()> loopIteration) {
boost::unique_lock<boost::mutex> lock(_mutex); boost::unique_lock<boost::mutex> lock(_mutex);
auto thread = _startThread(loopIteration); auto thread = _startThread(loopIteration);
_runningThreads.push_back(RunningThread{loopIteration, std::move(thread)}); _runningThreads.push_back(RunningThread{loopIteration, std::move(thread)});
@ -60,16 +60,18 @@ namespace cpputils {
_mutex.unlock(); // Was locked in the before-fork handler _mutex.unlock(); // Was locked in the before-fork handler
} }
boost::thread ThreadSystem::_startThread(function<void()> loopIteration) { boost::thread ThreadSystem::_startThread(function<bool()> loopIteration) {
return boost::thread(std::bind(&ThreadSystem::_runThread, loopIteration)); return boost::thread(std::bind(&ThreadSystem::_runThread, loopIteration));
} }
void ThreadSystem::_runThread(function<void()> loopIteration) { void ThreadSystem::_runThread(function<bool()> loopIteration) {
try { try {
while(true) { bool cont = true;
while(cont) {
boost::this_thread::interruption_point(); boost::this_thread::interruption_point();
loopIteration(); // This might also be interrupted. cont = loopIteration(); // This might also be interrupted.
} }
//The thread is terminated gracefully.
} catch (const boost::thread_interrupted &e) { } catch (const boost::thread_interrupted &e) {
//Do nothing, exit thread. //Do nothing, exit thread.
} catch (const std::exception &e) { } catch (const std::exception &e) {
@ -77,5 +79,6 @@ namespace cpputils {
} catch (...) { } catch (...) {
LOG(ERROR) << "LoopThread crashed"; LOG(ERROR) << "LoopThread crashed";
} }
//TODO We should remove the thread from _runningThreads here, not in stop().
} }
} }

View File

@ -13,7 +13,7 @@ namespace cpputils {
class ThreadSystem final { class ThreadSystem final {
private: private:
struct RunningThread { struct RunningThread {
std::function<void()> loopIteration; std::function<bool()> loopIteration; // The loopIteration callback returns true, if more iterations should be run, and false, if the thread should be terminated.
boost::thread thread; // boost::thread because we need it to be interruptible. boost::thread thread; // boost::thread because we need it to be interruptible.
}; };
public: public:
@ -21,20 +21,20 @@ namespace cpputils {
static ThreadSystem &singleton(); static ThreadSystem &singleton();
Handle start(std::function<void()> loopIteration); Handle start(std::function<bool()> loopIteration);
void stop(Handle handle); void stop(Handle handle);
private: private:
ThreadSystem(); ThreadSystem();
static void _runThread(std::function<void()> loopIteration); static void _runThread(std::function<bool()> loopIteration);
static void _onBeforeFork(); static void _onBeforeFork();
static void _onAfterFork(); static void _onAfterFork();
//TODO Rename to _doOnBeforeFork and _doAfterFork or similar, because they also handle locking _mutex for fork(). //TODO Rename to _doOnBeforeFork and _doAfterFork or similar, because they also handle locking _mutex for fork().
void _stopAllThreadsForRestart(); void _stopAllThreadsForRestart();
void _restartAllThreads(); void _restartAllThreads();
boost::thread _startThread(std::function<void()> loopIteration); boost::thread _startThread(std::function<bool()> loopIteration);
std::list<RunningThread> _runningThreads; // std::list, because we give out iterators as handles std::list<RunningThread> _runningThreads; // std::list, because we give out iterators as handles
boost::mutex _mutex; boost::mutex _mutex;