Process I/O needs to use the async API to avoid deadlocks

This commit is contained in:
Sebastian Messmer 2021-12-12 13:00:07 +01:00
parent e524468cc1
commit c4bc749fa6
4 changed files with 140 additions and 46 deletions

View File

@ -4,12 +4,15 @@
#include <cerrno>
#include <array>
#include <boost/process.hpp>
#include <boost/asio.hpp>
using std::string;
using std::vector;
namespace bp = boost::process;
namespace bf = boost::filesystem;
namespace ba = boost::asio;
namespace bs = boost::system;
namespace cpputils
{
@ -24,48 +27,139 @@ namespace cpputils
}
return executable;
}
class OutputPipeHandler final
{
public:
explicit OutputPipeHandler(ba::io_context* ctx)
: vOut_(128 * 1024)
, buffer_(ba::buffer(vOut_))
, pipe_(*ctx)
, output_() {
}
void async_read()
{
std::function<void(const bs::error_code & ec, std::size_t n)> onOutput;
onOutput = [&](const bs::error_code & ec, size_t n)
{
output_.reserve(output_.size() + n);
output_.insert(output_.end(), vOut_.begin(), vOut_.begin() + n);
if (ec) {
if (ec != ba::error::eof) {
throw SubprocessError(std::string() + "Error getting output from subprocess. Error code: " + std::to_string(ec.value()) + " : " + ec.message());
}
} else {
ba::async_read(pipe_, buffer_, onOutput);
}
};
ba::async_read(pipe_, buffer_, onOutput);
}
bp::async_pipe& pipe()
{
return pipe_;
}
std::string output() &&
{
return std::move(output_);
}
private:
std::vector<char> vOut_;
ba::mutable_buffer buffer_;
bp::async_pipe pipe_;
std::string output_;
};
class InputPipeHandler final
{
public:
explicit InputPipeHandler(ba::io_context* ctx, const std::string& input)
: input_(input)
, buffer_(ba::buffer(input_))
, pipe_(*ctx) {
}
bp::async_pipe& pipe()
{
return pipe_;
}
void async_write()
{
ba::async_write(pipe_, buffer_,
[&](const bs::error_code & ec, std::size_t /*n*/)
{
if (ec) {
throw SubprocessError(std::string() + "Error sending input to subprocess. Error code: " + std::to_string(ec.value()) + " : " + ec.message());
}
pipe_.async_close();
}
);
}
private:
const std::string& input_;
ba::const_buffer buffer_;
bp::async_pipe pipe_;
};
}
SubprocessResult Subprocess::call(const char *command, const vector<string> &args)
SubprocessResult Subprocess::call(const char *command, const vector<string> &args, const string &input)
{
return call(_find_executable(command), args);
return call(_find_executable(command), args, input);
}
SubprocessResult Subprocess::check_call(const char *command, const vector<string> &args)
SubprocessResult Subprocess::check_call(const char *command, const vector<string> &args, const string& input)
{
return check_call(_find_executable(command), args);
return check_call(_find_executable(command), args, input);
}
SubprocessResult Subprocess::call(const bf::path &executable, const vector<string> &args)
SubprocessResult Subprocess::call(const bf::path &executable, const vector<string> &args, const string& input)
{
if (!bf::exists(executable))
{
throw std::runtime_error("Tried to run executable " + executable.string() + " but didn't find it");
}
bp::ipstream child_stdout;
bp::ipstream child_stderr;
bp::child child = bp::child(bp::exe = executable.string(), bp::std_out > child_stdout, bp::std_err > child_stderr, bp::args(args));
if (!child.valid())
{
throw std::runtime_error("Error starting subprocess " + executable.string() + ". Errno: " + std::to_string(errno));
}
// Process I/O needs to use the async API to avoid deadlocks, see
// - https://www.boost.org/doc/libs/1_78_0/doc/html/boost_process/faq.html
// - Code taken from https://www.py4u.net/discuss/97014 and modified
child.join();
ba::io_context ctx;
string output_stdout = string(std::istreambuf_iterator<char>(child_stdout), {});
string output_stderr = string(std::istreambuf_iterator<char>(child_stderr), {});
OutputPipeHandler stdout_handler(&ctx);
OutputPipeHandler stderr_handler(&ctx);
InputPipeHandler stdin_handler(&ctx, input);
bp::child child(
bp::exe = executable.string(),
bp::args(args),
bp::std_out > stdout_handler.pipe(),
bp::std_err > stderr_handler.pipe(),
bp::std_in < stdin_handler.pipe()
);
stdin_handler.async_write();
stdout_handler.async_read();
stderr_handler.async_read();
ctx.run();
child.wait();
return SubprocessResult{
std::move(output_stdout),
std::move(output_stderr),
std::move(stdout_handler).output(),
std::move(stderr_handler).output(),
child.exit_code(),
};
}
SubprocessResult Subprocess::check_call(const bf::path &executable, const vector<string> &args)
SubprocessResult Subprocess::check_call(const bf::path &executable, const vector<string> &args, const string& input)
{
auto result = call(executable, args);
auto result = call(executable, args, input);
if (result.exitcode != 0)
{
throw SubprocessError("Subprocess \"" + executable.string() + "\" exited with code " + std::to_string(result.exitcode));

View File

@ -25,10 +25,10 @@ namespace cpputils
class Subprocess final
{
public:
static SubprocessResult call(const char *command, const std::vector<std::string> &args);
static SubprocessResult call(const boost::filesystem::path &executable, const std::vector<std::string> &args);
static SubprocessResult check_call(const char *command, const std::vector<std::string> &args);
static SubprocessResult check_call(const boost::filesystem::path &executable, const std::vector<std::string> &args);
static SubprocessResult call(const char *command, const std::vector<std::string> &args, const std::string& input);
static SubprocessResult call(const boost::filesystem::path &executable, const std::vector<std::string> &args, const std::string& input);
static SubprocessResult check_call(const char *command, const std::vector<std::string> &args, const std::string& input);
static SubprocessResult check_call(const boost::filesystem::path &executable, const std::vector<std::string> &args, const std::string& input);
private:
DISALLOW_COPY_AND_ASSIGN(Subprocess);

View File

@ -22,7 +22,7 @@ namespace
{
throw std::runtime_error(executable.string() + " not found.");
}
auto result = cpputils::Subprocess::call(executable, {kind, signal});
auto result = cpputils::Subprocess::call(executable, {kind, signal}, "");
return result.output_stderr;
}
}

View File

@ -35,100 +35,100 @@ namespace
TEST(SubprocessTest, CheckCall_success_output)
{
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}).output_stdout);
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "").output_stdout);
}
TEST(SubprocessTest, CheckCall_successwithemptyoutput_output)
{
EXPECT_EQ("", Subprocess::check_call(exit_with_message_and_status(), {"0"}).output_stdout);
EXPECT_EQ("", Subprocess::check_call(exit_with_message_and_status(), {"0"}, "").output_stdout);
}
TEST(SubprocessTest, CheckCall_success_exitcode)
{
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}).exitcode);
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "").exitcode);
}
TEST(SubprocessTest, CheckCall_successwithemptyoutput_exitcode)
{
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0"}).exitcode);
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0"}, "").exitcode);
}
TEST(SubprocessTest, CheckCall_error)
{
EXPECT_THROW(
Subprocess::check_call(exit_with_message_and_status(), {"1"}),
Subprocess::check_call(exit_with_message_and_status(), {"1"}, ""),
SubprocessError);
}
TEST(SubprocessTest, CheckCall_error5)
{
EXPECT_THROW(
Subprocess::check_call(exit_with_message_and_status(), {"5"}),
Subprocess::check_call(exit_with_message_and_status(), {"5"}, ""),
SubprocessError);
}
TEST(SubprocessTest, CheckCall_errorwithoutput)
{
EXPECT_THROW(
Subprocess::check_call(exit_with_message_and_status(), {"1", "hello"}),
Subprocess::check_call(exit_with_message_and_status(), {"1", "hello"}, ""),
SubprocessError);
}
TEST(SubprocessTest, CheckCall_error5withoutput)
{
EXPECT_THROW(
Subprocess::check_call(exit_with_message_and_status(), {"5", "hello"}),
Subprocess::check_call(exit_with_message_and_status(), {"5", "hello"}, ""),
SubprocessError);
}
TEST(SubprocessTest, Call_success_exitcode)
{
EXPECT_EQ(0, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}).exitcode);
EXPECT_EQ(0, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}, "").exitcode);
}
TEST(SubprocessTest, Call_success_output)
{
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}).output_stdout);
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}, "").output_stdout);
}
TEST(SubprocessTest, Call_error_exitcode)
{
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1"}).exitcode);
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1"}, "").exitcode);
}
TEST(SubprocessTest, Call_error_output)
{
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}).output_stdout);
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}, "").output_stdout);
}
TEST(SubprocessTest, Call_error5_exitcode)
{
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5"}).exitcode);
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5"}, "").exitcode);
}
TEST(SubprocessTest, Call_error5_output)
{
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}).output_stdout);
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}, "").output_stdout);
}
TEST(SubprocessTest, Call_errorwithoutput_output)
{
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}).output_stdout);
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}, "").output_stdout);
}
TEST(SubprocessTest, Call_errorwithoutput_exitcode)
{
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}).exitcode);
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}, "").exitcode);
}
TEST(SubprocessTest, Call_error5withoutput_output)
{
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}).output_stdout);
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}, "").output_stdout);
}
TEST(SubprocessTest, Call_error5withoutput_exitcode)
{
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}).exitcode);
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}, "").exitcode);
}
// TODO Move this test to a test suite for ThreadSystem/LoopThread
@ -140,7 +140,7 @@ TEST(SubprocessTest, CallFromThreadSystemThread)
cpputils::LoopThread thread(
[&barrier]()
{
auto result = Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"});
auto result = Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "");
EXPECT_EQ(0, result.exitcode);
EXPECT_EQ(std::string("hello") + NEWLINE, result.output_stdout);
@ -157,12 +157,12 @@ TEST(SubprocessTest, CallFromThreadSystemThread)
TEST(SubprocessTest, Call_argumentwithspaces)
{
// Test that arguments can have spaces and are still treated as one argument
EXPECT_EQ(std::string("hello world") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello world"}).output_stdout);
EXPECT_EQ(std::string("hello") + NEWLINE + "world" + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello", "world"}).output_stdout);
EXPECT_EQ(std::string("hello world") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello world"}, "").output_stdout);
EXPECT_EQ(std::string("hello") + NEWLINE + "world" + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello", "world"}, "").output_stdout);
}
TEST(SubprocessTest, Call_withcommandfrompath)
{
// Test that we can call a system command without specifying the full path
EXPECT_EQ("hello\n", Subprocess::check_call("echo", {"hello"}).output_stdout);
EXPECT_EQ("hello\n", Subprocess::check_call("echo", {"hello"}, "").output_stdout);
}