From cef9a9bd539e7ff65575a46af61b633b1eef664e Mon Sep 17 00:00:00 2001 From: Sebastian Messmer Date: Sun, 12 Dec 2021 13:00:07 +0100 Subject: [PATCH] Process I/O needs to use the async API to avoid deadlocks --- src/cpp-utils/process/subprocess.cpp | 132 ++++++++++++++++++---- src/cpp-utils/process/subprocess.h | 8 +- test/cpp-utils/assert/backtrace_test.cpp | 2 +- test/cpp-utils/process/SubprocessTest.cpp | 44 ++++---- 4 files changed, 140 insertions(+), 46 deletions(-) diff --git a/src/cpp-utils/process/subprocess.cpp b/src/cpp-utils/process/subprocess.cpp index 64ae12ac..4ce4aea2 100644 --- a/src/cpp-utils/process/subprocess.cpp +++ b/src/cpp-utils/process/subprocess.cpp @@ -4,12 +4,15 @@ #include #include #include +#include using std::string; using std::vector; namespace bp = boost::process; namespace bf = boost::filesystem; +namespace ba = boost::asio; +namespace bs = boost::system; namespace cpputils { @@ -24,48 +27,139 @@ namespace cpputils } return executable; } + + class OutputPipeHandler final + { + public: + explicit OutputPipeHandler(ba::io_context* ctx) + : vOut_(128 * 1024) + , buffer_(ba::buffer(vOut_)) + , pipe_(*ctx) + , output_() { + } + + void async_read() + { + std::function onOutput; + onOutput = [&](const bs::error_code & ec, size_t n) + { + output_.reserve(output_.size() + n); + output_.insert(output_.end(), vOut_.begin(), vOut_.begin() + n); + if (ec) { + if (ec != ba::error::eof) { + throw SubprocessError(std::string() + "Error getting output from subprocess. Error code: " + std::to_string(ec.value()) + " : " + ec.message()); + } + } else { + ba::async_read(pipe_, buffer_, onOutput); + } + }; + ba::async_read(pipe_, buffer_, onOutput); + } + + bp::async_pipe& pipe() + { + return pipe_; + } + + std::string output() && + { + return std::move(output_); + } + + + private: + std::vector vOut_; + ba::mutable_buffer buffer_; + bp::async_pipe pipe_; + std::string output_; + }; + + class InputPipeHandler final + { + public: + explicit InputPipeHandler(ba::io_context* ctx, const std::string& input) + : input_(input) + , buffer_(ba::buffer(input_)) + , pipe_(*ctx) { + + } + + bp::async_pipe& pipe() + { + return pipe_; + } + + void async_write() + { + ba::async_write(pipe_, buffer_, + [&](const bs::error_code & ec, std::size_t /*n*/) + { + if (ec) { + throw SubprocessError(std::string() + "Error sending input to subprocess. Error code: " + std::to_string(ec.value()) + " : " + ec.message()); + } + pipe_.async_close(); + } + ); + } + private: + const std::string& input_; + ba::const_buffer buffer_; + bp::async_pipe pipe_; + }; } - SubprocessResult Subprocess::call(const char *command, const vector &args) + SubprocessResult Subprocess::call(const char *command, const vector &args, const string &input) { - return call(_find_executable(command), args); + return call(_find_executable(command), args, input); } - SubprocessResult Subprocess::check_call(const char *command, const vector &args) + SubprocessResult Subprocess::check_call(const char *command, const vector &args, const string& input) { - return check_call(_find_executable(command), args); + return check_call(_find_executable(command), args, input); } - SubprocessResult Subprocess::call(const bf::path &executable, const vector &args) + SubprocessResult Subprocess::call(const bf::path &executable, const vector &args, const string& input) { if (!bf::exists(executable)) { throw std::runtime_error("Tried to run executable " + executable.string() + " but didn't find it"); } - bp::ipstream child_stdout; - bp::ipstream child_stderr; - bp::child child = bp::child(bp::exe = executable.string(), bp::std_out > child_stdout, bp::std_err > child_stderr, bp::args(args)); - if (!child.valid()) - { - throw std::runtime_error("Error starting subprocess " + executable.string() + ". Errno: " + std::to_string(errno)); - } + // Process I/O needs to use the async API to avoid deadlocks, see + // - https://www.boost.org/doc/libs/1_78_0/doc/html/boost_process/faq.html + // - Code taken from https://www.py4u.net/discuss/97014 and modified - child.join(); + ba::io_context ctx; - string output_stdout = string(std::istreambuf_iterator(child_stdout), {}); - string output_stderr = string(std::istreambuf_iterator(child_stderr), {}); + OutputPipeHandler stdout_handler(&ctx); + OutputPipeHandler stderr_handler(&ctx); + InputPipeHandler stdin_handler(&ctx, input); + + bp::child child( + bp::exe = executable.string(), + bp::args(args), + bp::std_out > stdout_handler.pipe(), + bp::std_err > stderr_handler.pipe(), + bp::std_in < stdin_handler.pipe() + ); + + stdin_handler.async_write(); + stdout_handler.async_read(); + stderr_handler.async_read(); + + ctx.run(); + child.wait(); return SubprocessResult{ - std::move(output_stdout), - std::move(output_stderr), + std::move(stdout_handler).output(), + std::move(stderr_handler).output(), child.exit_code(), }; } - SubprocessResult Subprocess::check_call(const bf::path &executable, const vector &args) + SubprocessResult Subprocess::check_call(const bf::path &executable, const vector &args, const string& input) { - auto result = call(executable, args); + auto result = call(executable, args, input); if (result.exitcode != 0) { throw SubprocessError("Subprocess \"" + executable.string() + "\" exited with code " + std::to_string(result.exitcode)); diff --git a/src/cpp-utils/process/subprocess.h b/src/cpp-utils/process/subprocess.h index bea5a159..a38739bf 100644 --- a/src/cpp-utils/process/subprocess.h +++ b/src/cpp-utils/process/subprocess.h @@ -25,10 +25,10 @@ namespace cpputils class Subprocess final { public: - static SubprocessResult call(const char *command, const std::vector &args); - static SubprocessResult call(const boost::filesystem::path &executable, const std::vector &args); - static SubprocessResult check_call(const char *command, const std::vector &args); - static SubprocessResult check_call(const boost::filesystem::path &executable, const std::vector &args); + static SubprocessResult call(const char *command, const std::vector &args, const std::string& input); + static SubprocessResult call(const boost::filesystem::path &executable, const std::vector &args, const std::string& input); + static SubprocessResult check_call(const char *command, const std::vector &args, const std::string& input); + static SubprocessResult check_call(const boost::filesystem::path &executable, const std::vector &args, const std::string& input); private: DISALLOW_COPY_AND_ASSIGN(Subprocess); diff --git a/test/cpp-utils/assert/backtrace_test.cpp b/test/cpp-utils/assert/backtrace_test.cpp index fdcc5f4c..7b0ab9d3 100644 --- a/test/cpp-utils/assert/backtrace_test.cpp +++ b/test/cpp-utils/assert/backtrace_test.cpp @@ -22,7 +22,7 @@ namespace { throw std::runtime_error(executable.string() + " not found."); } - auto result = cpputils::Subprocess::call(executable, {kind, signal}); + auto result = cpputils::Subprocess::call(executable, {kind, signal}, ""); return result.output_stderr; } } diff --git a/test/cpp-utils/process/SubprocessTest.cpp b/test/cpp-utils/process/SubprocessTest.cpp index c9a98214..586697fe 100644 --- a/test/cpp-utils/process/SubprocessTest.cpp +++ b/test/cpp-utils/process/SubprocessTest.cpp @@ -35,100 +35,100 @@ namespace TEST(SubprocessTest, CheckCall_success_output) { - EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}).output_stdout); + EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "").output_stdout); } TEST(SubprocessTest, CheckCall_successwithemptyoutput_output) { - EXPECT_EQ("", Subprocess::check_call(exit_with_message_and_status(), {"0"}).output_stdout); + EXPECT_EQ("", Subprocess::check_call(exit_with_message_and_status(), {"0"}, "").output_stdout); } TEST(SubprocessTest, CheckCall_success_exitcode) { - EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}).exitcode); + EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "").exitcode); } TEST(SubprocessTest, CheckCall_successwithemptyoutput_exitcode) { - EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0"}).exitcode); + EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0"}, "").exitcode); } TEST(SubprocessTest, CheckCall_error) { EXPECT_THROW( - Subprocess::check_call(exit_with_message_and_status(), {"1"}), + Subprocess::check_call(exit_with_message_and_status(), {"1"}, ""), SubprocessError); } TEST(SubprocessTest, CheckCall_error5) { EXPECT_THROW( - Subprocess::check_call(exit_with_message_and_status(), {"5"}), + Subprocess::check_call(exit_with_message_and_status(), {"5"}, ""), SubprocessError); } TEST(SubprocessTest, CheckCall_errorwithoutput) { EXPECT_THROW( - Subprocess::check_call(exit_with_message_and_status(), {"1", "hello"}), + Subprocess::check_call(exit_with_message_and_status(), {"1", "hello"}, ""), SubprocessError); } TEST(SubprocessTest, CheckCall_error5withoutput) { EXPECT_THROW( - Subprocess::check_call(exit_with_message_and_status(), {"5", "hello"}), + Subprocess::check_call(exit_with_message_and_status(), {"5", "hello"}, ""), SubprocessError); } TEST(SubprocessTest, Call_success_exitcode) { - EXPECT_EQ(0, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}).exitcode); + EXPECT_EQ(0, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}, "").exitcode); } TEST(SubprocessTest, Call_success_output) { - EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}).output_stdout); + EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}, "").output_stdout); } TEST(SubprocessTest, Call_error_exitcode) { - EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1"}).exitcode); + EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1"}, "").exitcode); } TEST(SubprocessTest, Call_error_output) { - EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}).output_stdout); + EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}, "").output_stdout); } TEST(SubprocessTest, Call_error5_exitcode) { - EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5"}).exitcode); + EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5"}, "").exitcode); } TEST(SubprocessTest, Call_error5_output) { - EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}).output_stdout); + EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}, "").output_stdout); } TEST(SubprocessTest, Call_errorwithoutput_output) { - EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}).output_stdout); + EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}, "").output_stdout); } TEST(SubprocessTest, Call_errorwithoutput_exitcode) { - EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}).exitcode); + EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}, "").exitcode); } TEST(SubprocessTest, Call_error5withoutput_output) { - EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}).output_stdout); + EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}, "").output_stdout); } TEST(SubprocessTest, Call_error5withoutput_exitcode) { - EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}).exitcode); + EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}, "").exitcode); } // TODO Move this test to a test suite for ThreadSystem/LoopThread @@ -140,7 +140,7 @@ TEST(SubprocessTest, CallFromThreadSystemThread) cpputils::LoopThread thread( [&barrier]() { - auto result = Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}); + auto result = Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, ""); EXPECT_EQ(0, result.exitcode); EXPECT_EQ(std::string("hello") + NEWLINE, result.output_stdout); @@ -157,12 +157,12 @@ TEST(SubprocessTest, CallFromThreadSystemThread) TEST(SubprocessTest, Call_argumentwithspaces) { // Test that arguments can have spaces and are still treated as one argument - EXPECT_EQ(std::string("hello world") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello world"}).output_stdout); - EXPECT_EQ(std::string("hello") + NEWLINE + "world" + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello", "world"}).output_stdout); + EXPECT_EQ(std::string("hello world") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello world"}, "").output_stdout); + EXPECT_EQ(std::string("hello") + NEWLINE + "world" + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello", "world"}, "").output_stdout); } TEST(SubprocessTest, Call_withcommandfrompath) { // Test that we can call a system command without specifying the full path - EXPECT_EQ("hello\n", Subprocess::check_call("echo", {"hello"}).output_stdout); + EXPECT_EQ("hello\n", Subprocess::check_call("echo", {"hello"}, "").output_stdout); }