Process I/O needs to use the async API to avoid deadlocks
This commit is contained in:
parent
875f55ee41
commit
cef9a9bd53
@ -4,12 +4,15 @@
|
||||
#include <cerrno>
|
||||
#include <array>
|
||||
#include <boost/process.hpp>
|
||||
#include <boost/asio.hpp>
|
||||
|
||||
using std::string;
|
||||
using std::vector;
|
||||
|
||||
namespace bp = boost::process;
|
||||
namespace bf = boost::filesystem;
|
||||
namespace ba = boost::asio;
|
||||
namespace bs = boost::system;
|
||||
|
||||
namespace cpputils
|
||||
{
|
||||
@ -24,48 +27,139 @@ namespace cpputils
|
||||
}
|
||||
return executable;
|
||||
}
|
||||
|
||||
class OutputPipeHandler final
|
||||
{
|
||||
public:
|
||||
explicit OutputPipeHandler(ba::io_context* ctx)
|
||||
: vOut_(128 * 1024)
|
||||
, buffer_(ba::buffer(vOut_))
|
||||
, pipe_(*ctx)
|
||||
, output_() {
|
||||
}
|
||||
|
||||
void async_read()
|
||||
{
|
||||
std::function<void(const bs::error_code & ec, std::size_t n)> onOutput;
|
||||
onOutput = [&](const bs::error_code & ec, size_t n)
|
||||
{
|
||||
output_.reserve(output_.size() + n);
|
||||
output_.insert(output_.end(), vOut_.begin(), vOut_.begin() + n);
|
||||
if (ec) {
|
||||
if (ec != ba::error::eof) {
|
||||
throw SubprocessError(std::string() + "Error getting output from subprocess. Error code: " + std::to_string(ec.value()) + " : " + ec.message());
|
||||
}
|
||||
} else {
|
||||
ba::async_read(pipe_, buffer_, onOutput);
|
||||
}
|
||||
};
|
||||
ba::async_read(pipe_, buffer_, onOutput);
|
||||
}
|
||||
|
||||
bp::async_pipe& pipe()
|
||||
{
|
||||
return pipe_;
|
||||
}
|
||||
|
||||
std::string output() &&
|
||||
{
|
||||
return std::move(output_);
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
std::vector<char> vOut_;
|
||||
ba::mutable_buffer buffer_;
|
||||
bp::async_pipe pipe_;
|
||||
std::string output_;
|
||||
};
|
||||
|
||||
class InputPipeHandler final
|
||||
{
|
||||
public:
|
||||
explicit InputPipeHandler(ba::io_context* ctx, const std::string& input)
|
||||
: input_(input)
|
||||
, buffer_(ba::buffer(input_))
|
||||
, pipe_(*ctx) {
|
||||
|
||||
}
|
||||
|
||||
bp::async_pipe& pipe()
|
||||
{
|
||||
return pipe_;
|
||||
}
|
||||
|
||||
void async_write()
|
||||
{
|
||||
ba::async_write(pipe_, buffer_,
|
||||
[&](const bs::error_code & ec, std::size_t /*n*/)
|
||||
{
|
||||
if (ec) {
|
||||
throw SubprocessError(std::string() + "Error sending input to subprocess. Error code: " + std::to_string(ec.value()) + " : " + ec.message());
|
||||
}
|
||||
pipe_.async_close();
|
||||
}
|
||||
);
|
||||
}
|
||||
private:
|
||||
const std::string& input_;
|
||||
ba::const_buffer buffer_;
|
||||
bp::async_pipe pipe_;
|
||||
};
|
||||
}
|
||||
|
||||
SubprocessResult Subprocess::call(const char *command, const vector<string> &args)
|
||||
SubprocessResult Subprocess::call(const char *command, const vector<string> &args, const string &input)
|
||||
{
|
||||
return call(_find_executable(command), args);
|
||||
return call(_find_executable(command), args, input);
|
||||
}
|
||||
|
||||
SubprocessResult Subprocess::check_call(const char *command, const vector<string> &args)
|
||||
SubprocessResult Subprocess::check_call(const char *command, const vector<string> &args, const string& input)
|
||||
{
|
||||
return check_call(_find_executable(command), args);
|
||||
return check_call(_find_executable(command), args, input);
|
||||
}
|
||||
|
||||
SubprocessResult Subprocess::call(const bf::path &executable, const vector<string> &args)
|
||||
SubprocessResult Subprocess::call(const bf::path &executable, const vector<string> &args, const string& input)
|
||||
{
|
||||
if (!bf::exists(executable))
|
||||
{
|
||||
throw std::runtime_error("Tried to run executable " + executable.string() + " but didn't find it");
|
||||
}
|
||||
|
||||
bp::ipstream child_stdout;
|
||||
bp::ipstream child_stderr;
|
||||
bp::child child = bp::child(bp::exe = executable.string(), bp::std_out > child_stdout, bp::std_err > child_stderr, bp::args(args));
|
||||
if (!child.valid())
|
||||
{
|
||||
throw std::runtime_error("Error starting subprocess " + executable.string() + ". Errno: " + std::to_string(errno));
|
||||
}
|
||||
// Process I/O needs to use the async API to avoid deadlocks, see
|
||||
// - https://www.boost.org/doc/libs/1_78_0/doc/html/boost_process/faq.html
|
||||
// - Code taken from https://www.py4u.net/discuss/97014 and modified
|
||||
|
||||
child.join();
|
||||
ba::io_context ctx;
|
||||
|
||||
string output_stdout = string(std::istreambuf_iterator<char>(child_stdout), {});
|
||||
string output_stderr = string(std::istreambuf_iterator<char>(child_stderr), {});
|
||||
OutputPipeHandler stdout_handler(&ctx);
|
||||
OutputPipeHandler stderr_handler(&ctx);
|
||||
InputPipeHandler stdin_handler(&ctx, input);
|
||||
|
||||
bp::child child(
|
||||
bp::exe = executable.string(),
|
||||
bp::args(args),
|
||||
bp::std_out > stdout_handler.pipe(),
|
||||
bp::std_err > stderr_handler.pipe(),
|
||||
bp::std_in < stdin_handler.pipe()
|
||||
);
|
||||
|
||||
stdin_handler.async_write();
|
||||
stdout_handler.async_read();
|
||||
stderr_handler.async_read();
|
||||
|
||||
ctx.run();
|
||||
child.wait();
|
||||
|
||||
return SubprocessResult{
|
||||
std::move(output_stdout),
|
||||
std::move(output_stderr),
|
||||
std::move(stdout_handler).output(),
|
||||
std::move(stderr_handler).output(),
|
||||
child.exit_code(),
|
||||
};
|
||||
}
|
||||
|
||||
SubprocessResult Subprocess::check_call(const bf::path &executable, const vector<string> &args)
|
||||
SubprocessResult Subprocess::check_call(const bf::path &executable, const vector<string> &args, const string& input)
|
||||
{
|
||||
auto result = call(executable, args);
|
||||
auto result = call(executable, args, input);
|
||||
if (result.exitcode != 0)
|
||||
{
|
||||
throw SubprocessError("Subprocess \"" + executable.string() + "\" exited with code " + std::to_string(result.exitcode));
|
||||
|
@ -25,10 +25,10 @@ namespace cpputils
|
||||
class Subprocess final
|
||||
{
|
||||
public:
|
||||
static SubprocessResult call(const char *command, const std::vector<std::string> &args);
|
||||
static SubprocessResult call(const boost::filesystem::path &executable, const std::vector<std::string> &args);
|
||||
static SubprocessResult check_call(const char *command, const std::vector<std::string> &args);
|
||||
static SubprocessResult check_call(const boost::filesystem::path &executable, const std::vector<std::string> &args);
|
||||
static SubprocessResult call(const char *command, const std::vector<std::string> &args, const std::string& input);
|
||||
static SubprocessResult call(const boost::filesystem::path &executable, const std::vector<std::string> &args, const std::string& input);
|
||||
static SubprocessResult check_call(const char *command, const std::vector<std::string> &args, const std::string& input);
|
||||
static SubprocessResult check_call(const boost::filesystem::path &executable, const std::vector<std::string> &args, const std::string& input);
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(Subprocess);
|
||||
|
@ -22,7 +22,7 @@ namespace
|
||||
{
|
||||
throw std::runtime_error(executable.string() + " not found.");
|
||||
}
|
||||
auto result = cpputils::Subprocess::call(executable, {kind, signal});
|
||||
auto result = cpputils::Subprocess::call(executable, {kind, signal}, "");
|
||||
return result.output_stderr;
|
||||
}
|
||||
}
|
||||
|
@ -35,100 +35,100 @@ namespace
|
||||
|
||||
TEST(SubprocessTest, CheckCall_success_output)
|
||||
{
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}).output_stdout);
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, CheckCall_successwithemptyoutput_output)
|
||||
{
|
||||
EXPECT_EQ("", Subprocess::check_call(exit_with_message_and_status(), {"0"}).output_stdout);
|
||||
EXPECT_EQ("", Subprocess::check_call(exit_with_message_and_status(), {"0"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, CheckCall_success_exitcode)
|
||||
{
|
||||
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}).exitcode);
|
||||
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "").exitcode);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, CheckCall_successwithemptyoutput_exitcode)
|
||||
{
|
||||
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0"}).exitcode);
|
||||
EXPECT_EQ(0, Subprocess::check_call(exit_with_message_and_status(), {"0"}, "").exitcode);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, CheckCall_error)
|
||||
{
|
||||
EXPECT_THROW(
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"1"}),
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"1"}, ""),
|
||||
SubprocessError);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, CheckCall_error5)
|
||||
{
|
||||
EXPECT_THROW(
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"5"}),
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"5"}, ""),
|
||||
SubprocessError);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, CheckCall_errorwithoutput)
|
||||
{
|
||||
EXPECT_THROW(
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"1", "hello"}),
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"1", "hello"}, ""),
|
||||
SubprocessError);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, CheckCall_error5withoutput)
|
||||
{
|
||||
EXPECT_THROW(
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"5", "hello"}),
|
||||
Subprocess::check_call(exit_with_message_and_status(), {"5", "hello"}, ""),
|
||||
SubprocessError);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_success_exitcode)
|
||||
{
|
||||
EXPECT_EQ(0, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}).exitcode);
|
||||
EXPECT_EQ(0, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}, "").exitcode);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_success_output)
|
||||
{
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}).output_stdout);
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"0", "hello"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_error_exitcode)
|
||||
{
|
||||
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1"}).exitcode);
|
||||
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1"}, "").exitcode);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_error_output)
|
||||
{
|
||||
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}).output_stdout);
|
||||
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_error5_exitcode)
|
||||
{
|
||||
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5"}).exitcode);
|
||||
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5"}, "").exitcode);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_error5_output)
|
||||
{
|
||||
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}).output_stdout);
|
||||
EXPECT_EQ("", Subprocess::call(exit_with_message_and_status(), {"1"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_errorwithoutput_output)
|
||||
{
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}).output_stdout);
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_errorwithoutput_exitcode)
|
||||
{
|
||||
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}).exitcode);
|
||||
EXPECT_EQ(1, Subprocess::call(exit_with_message_and_status(), {"1", "hello"}, "").exitcode);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_error5withoutput_output)
|
||||
{
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}).output_stdout);
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_error5withoutput_exitcode)
|
||||
{
|
||||
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}).exitcode);
|
||||
EXPECT_EQ(5, Subprocess::call(exit_with_message_and_status(), {"5", "hello"}, "").exitcode);
|
||||
}
|
||||
|
||||
// TODO Move this test to a test suite for ThreadSystem/LoopThread
|
||||
@ -140,7 +140,7 @@ TEST(SubprocessTest, CallFromThreadSystemThread)
|
||||
cpputils::LoopThread thread(
|
||||
[&barrier]()
|
||||
{
|
||||
auto result = Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"});
|
||||
auto result = Subprocess::check_call(exit_with_message_and_status(), {"0", "hello"}, "");
|
||||
EXPECT_EQ(0, result.exitcode);
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE, result.output_stdout);
|
||||
|
||||
@ -157,12 +157,12 @@ TEST(SubprocessTest, CallFromThreadSystemThread)
|
||||
TEST(SubprocessTest, Call_argumentwithspaces)
|
||||
{
|
||||
// Test that arguments can have spaces and are still treated as one argument
|
||||
EXPECT_EQ(std::string("hello world") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello world"}).output_stdout);
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE + "world" + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello", "world"}).output_stdout);
|
||||
EXPECT_EQ(std::string("hello world") + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello world"}, "").output_stdout);
|
||||
EXPECT_EQ(std::string("hello") + NEWLINE + "world" + NEWLINE, Subprocess::check_call(exit_with_message_and_status(), {"0", "hello", "world"}, "").output_stdout);
|
||||
}
|
||||
|
||||
TEST(SubprocessTest, Call_withcommandfrompath)
|
||||
{
|
||||
// Test that we can call a system command without specifying the full path
|
||||
EXPECT_EQ("hello\n", Subprocess::check_call("echo", {"hello"}).output_stdout);
|
||||
EXPECT_EQ("hello\n", Subprocess::check_call("echo", {"hello"}, "").output_stdout);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user