从音频播放C++类的实现学习Linux系统子进程管理
为了实现一个可以及时停止的音频播放类,学习使用fork、execl、setpgid、waitpid和进程组概念。
需求
之前车端软件播放音频文件都是在代码中使用popen、system等库函数执行Shell命令aplay
播放完整的短音频,不需要被打断。而这次的需求需要声音被及时打断,使用popen等库函数皆不能优雅的提前终止,所以有了此次造轮子的行为。其实boost库中有process模块,也能提前终止进程,但是看其示例感觉比较复杂容易使用不当,其中spawn、group等用法让人不看源代码、不学习底层原理,就不敢轻易使用。权衡了一下,总之都要学习一下进程管理,不如自己造个轮子,也能实操加深理解。
代码分析
代码的地址在这里
打断播放的音频:进程组的应用
在遇到另一篇文章中问题诊断: 小手册系列「1」的“popen后台符号&导致僵尸进程”问题时,我就观察到用popen
执行非内建命令时会至少两个进程,一个是/bin/sh
,另一个才是输入命令的进程,并且如果执行kill
命令杀死/bin/sh
进程并不会导致输入命令的进程退出。原因是popen
内部先创建Shell进程将用户命令作为参数传给Shell进程执行,Shell进程会创建子进程执行实际的命令,而kill父进程的pid不会将信号发给子进程。
对于本需求需要主动结束播放音频进程,如果使用popen
则难以获取播放音频进程的pid;如果使用fork
和exec*
系统调用创建子进程可以拿到子进程id,当然可以直接exec*
创建播放音频进程并且拿到其pid,通过kill函数可以提前终止。而播放音频命令aplay
不支持循环播放,这种做法需要在外部进行循环调用并不优雅,而且假设播放音频命令也创建子进程,kill命令不能结束子进程,可能会造成资源的泄漏。所以打算和popen
一样执行Shell命令,可以使用while
等命令实现循环调用,使用进程组就可以将结束信号发给Shell进程和其子进程。
下面主要介绍下进程组概念,进程组和会话主要用于Shell作业控制。
进程组由一个或多个共享同一进程组标识符(PGID)的进程组成,进程组标识符(PGID)等于首次创建进程组的进程pid,其生命周期为创建时刻开始,持续到最后一个进程退出进程组,在其生命周期不会有和PGID相同的pid分配给新进程。子进程会继承父进程的进程组。
会话则是一组进程组的集合。进程的会话成员关系是由其会话标识符(SID)确定的,会话标识符也是会话创建进程的pid。下图来自1,表示了进程组和会话之间的关系。
上图可以看到像sort < file | uniq -c
带有管道的命令会创建两个进程且在同一进程组,这样我们在前台执行ctrl+c时的终端会通过killpg
将信号发送给进程组中的每个进程,从而结束这条命令。利用这个特点,我们可以fork
后的子进程设置其为进程组的首进程,在父进程就可以通过killpg
发送信号,杀死子进程和其产生的子进程,从而达到目的。
pid_t exec(const std::string &cmd) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", cmd: " << cmd;
if (cmd.empty()) return -1;
pid_t pid = fork();
if (pid == 0) {
::setpgid(0, 0);
::execl("/bin/sh", "sh", "-c", cmd.c_str(), nullptr);
LOG_ERROR << "execl failure!";
_exit(EXIT_FAILURE);
} else if (pid < 0) {
LOG_ERROR << "fork failed: " << pid;
} else {
log_ss << ", pid: " << pid << ", setpgid ret: " << ::setpgid(pid, pid);
}
return pid;
}
核心代码如上所示,其中setpgid
是用来将进程为pid的进程的进程组修改为pgid,定义如下:
#include <unistd.h>
int setpgid(pid_t pid, pid_t pgid);
如果将pid的值设置为0,那么就会设置调用进程的进程组。如果将pgid的值设置为0,那么设置的进程组pgid等于调用进程的pid。所以在fork
后的子进程中可以直接调用::setpgid(0, 0);
将自己设置为进程组的首进程,即其pid==pgid,从而使得其创建的子进程都继承其进程组id。父进程即可通过killpg(pid, sig)
向/bin/sh进程和其产生的子进程发送sig信号。一个进程在其子进程已经执行exec()后就无法修改该子进程的进程组pgid了。
需要特别注意的是在父进程也调用了::setpgid(pid, pid)
,重复设置子进程的进程组id。这是进程安全的做法,由于fork
后子进程和父进程执行顺序不确定,如果只在子进程设置,可能出现子进程还没设置其线程组,而父进程就对线程组调用诸如killpg
等命令造成并发问题。所以在父进程调用是很有必要的。
-c选项Shell会直接执行该命令,而不是创建一个子Shell进程。
程序崩溃时音频自动停止
在循环播放音频时,如果父进程崩溃,并不会影响到子进程继续执行,此时声音就会一直被播放,这是不能被接受的。所以考虑在循环执行播放命令时做判断其父进程是否存在。
我的实现如下,通过持续判断当前进程的父进程(/proc/$$/stat文件中动态更新)是否等于进程创建时的父进程id($PPID)来判断父进程是否活着,如果父进程崩溃了,该子进程会被系统进程收养,其父进程id会改变。之所以没有直接判断$PPID进程是否还存在,是因为$PPID进程号可能被新启动的进程分配到,这种情况下主进程虽然已经崩溃,但是$PPID却还存在。
while [ $PPID -eq $(cut -d ' ' -f 4 /proc/$$/stat) ]; do
aplay voice_path
done
锁的使用
对上述进程管理操作,我封装了一个ShellProcess
类,用于管理子进程,其只有一个数据成员pid_
保存子进程的pid。同时封装VoicePlayer
使用ShellProcess
管理子进程播放音频。
在实现阻塞播放一次音频时,发现了一个难点:首先对于shell_process_
对象的调用需要加锁保护防止多线程竞争,但是我不希望在阻塞播放音频时全程持有这个锁,这样就不能在其它线程调用stop
(其也需要加锁)提前终止这次播放。
我的实现是在加锁的状态下进行exec
播放进程并保存其pid到栈上。然后释放锁再调用waitpid
,此时其它线程就可以随时stop
了。之后在waitpid
返回子进程结束后在, 再申请锁将shell_process_
对象持有的pid赋值为-1,表明子进程已经结束。当然为了防止竟态条件,只在该对象持有的pid==保存的pid时才进行操作。
void playOnceBlock(const std::string &voice_path) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
if (check_file_exist(voice_path)) {
pid_t pid_voice = -1;
{
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
shell_process_ = ShellProcess("aplay " + voice_path);
pid_voice = shell_process_.pid();
log_ss << ", pid: " << pid_voice;
cur_voice_path = voice_path;
}
log_ss.flush_log();
if (pid_voice > 0) {
int status = -1;
int waitpid_ret = ::waitpid(pid_voice, &status, 0); // waitpid阻塞时不持有锁, 其它线程可调用stop提前结束
log_ss << "waitpid, status: " << status << ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
<< ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
}
{
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
log_ss << "cur_pid: " << shell_process_.pid() << ", pid_voice: " << pid_voice;
if (shell_process_.pid() == pid_voice) log_ss << ", terminate: " << shell_process_.terminate();
cur_voice_path = "";
}
} else {
log_ss << ", not exist";
}
}
完整代码
update on 2024.06.25.
#define LOG_ERROR std::cerr << std::endl
#define LOG_INFO std::cout << std::endl
bool check_file_exist(const std::string &file_path) {
std::ifstream file(file_path);
return file.good();
}
class LogStringStream : public std::ostringstream {
public:
~LogStringStream() {
auto log_str = this->str();
if (log_str.size()) {
LOG_INFO << log_str;
}
}
void flush_log() {
auto log_str = this->str();
if (log_str.size()) {
LOG_INFO << log_str;
this->str("");
}
}
};
class DeferFunction {
public:
explicit DeferFunction(const std::function<void()> &defer_func) : defer_func_(defer_func) {}
~DeferFunction() { defer_func_(); }
private:
std::function<void()> defer_func_;
};
class ShellProcess {
public:
ShellProcess() = default;
ShellProcess(const std::string &cmd) : pid_(exec(cmd)) {}
ShellProcess(ShellProcess &&lhs) noexcept : pid_(lhs.pid_) { lhs.pid_ = -1; }
ShellProcess &operator=(ShellProcess &&lhs) noexcept {
if (this != &lhs) {
terminate();
this->pid_ = lhs.pid_;
lhs.pid_ = -1;
}
return *this;
}
ShellProcess(const ShellProcess &) = delete;
ShellProcess &operator=(const ShellProcess &) = delete;
int terminate() {
if (pid_ > 0) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", pid: " << pid_;
int killpg_ret = ::killpg(pid_, SIGKILL);
log_ss << ", killpg_ret: " << killpg_ret;
std::this_thread::yield();
if (killpg_ret < 0) {
log_ss << ", error: " << strerror(errno);
}
return waitNoBlock();
}
return -1;
}
int wait() {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", pid: " << pid_;
if (pid_ > 0) {
int status = -1;
int waitpid_ret = ::waitpid(pid_, &status, 0);
pid_ = -1;
log_ss << ", status: " << status << ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
<< ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
if (waitpid_ret < 0) {
log_ss << ", error: " << strerror(errno);
return -1;
}
return status;
}
return -1;
}
~ShellProcess() { terminate(); }
pid_t pid() const { return pid_; };
private:
pid_t exec(const std::string &cmd) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", cmd: " << cmd;
if (cmd.empty()) return -1;
pid_t pid = fork();
if (pid == 0) {
::setpgid(0, 0);
::execl("/bin/sh", "sh", "-c", cmd.c_str(), nullptr);
LOG_ERROR << "execl failure!";
_exit(EXIT_FAILURE);
} else if (pid < 0) {
LOG_ERROR << "fork failed: " << pid;
} else {
log_ss << ", pid: " << pid << ", setpgid ret: " << ::setpgid(pid, pid);
}
return pid;
}
int waitNoBlock() {
DeferFunction defer_func([this] { this->pid_ = -1; });
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", pid: " << pid_;
if (pid_ > 0) {
static const int max_retry_times = 15;
int status = -1;
int waitpid_ret = 0;
int retry_times = max_retry_times;
do {
waitpid_ret = ::waitpid(pid_, &status, WNOHANG);
if (max_retry_times - retry_times < 5)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
else
std::this_thread::yield();
} while (--retry_times > 0 && ((waitpid_ret == 0) || (waitpid_ret == -1 && errno == EINTR) ||
(waitpid_ret != -1 && !WIFEXITED(status) && !WIFSIGNALED(status))));
log_ss << ", retry_times: " << retry_times + 1 << ", status: " << status
<< ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
<< ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
return status;
}
return -1;
}
pid_t pid_ = -1;
};
class VoicePlayer {
public:
void playContinuous(const std::string &voice_path) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
if (check_file_exist(voice_path)) {
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
shell_process_ = ShellProcess("while [ $PPID -eq $(cut -d ' ' -f 4 /proc/$$/stat) ]; do aplay " + voice_path + "; done");
log_ss << ", pid: " << shell_process_.pid();
cur_voice_path = voice_path;
} else {
log_ss << ", not exist";
}
}
void playOnce(const std::string &voice_path) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
if (check_file_exist(voice_path)) {
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
shell_process_ = ShellProcess("aplay " + voice_path);
log_ss << ", pid: " << shell_process_.pid();
cur_voice_path = voice_path;
} else {
log_ss << ", not exist";
}
}
void playOnceBlock(const std::string &voice_path) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
if (check_file_exist(voice_path)) {
pid_t pid_voice = -1;
{
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
shell_process_ = ShellProcess("aplay " + voice_path);
pid_voice = shell_process_.pid();
log_ss << ", pid: " << pid_voice;
cur_voice_path = voice_path;
}
log_ss.flush_log();
if (pid_voice > 0) {
int status = -1;
int waitpid_ret = ::waitpid(pid_voice, &status, 0); // waitpid阻塞时不持有锁, 其它线程可调用stop提前结束
log_ss << "waitpid, status: " << status << ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
<< ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
}
{
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
log_ss << "cur_pid: " << shell_process_.pid() << ", pid_voice: " << pid_voice;
if (shell_process_.pid() == pid_voice) log_ss << ", terminate: " << shell_process_.terminate();
cur_voice_path = "";
}
} else {
log_ss << ", not exist";
}
}
void stop(const std::string &voice_path) {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__ << ", voice_path: " << voice_path;
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
log_ss << ", cur_voice: " << cur_voice_path;
if (voice_path == cur_voice_path) {
log_ss << ", terminate: " << shell_process_.terminate();
cur_voice_path = "";
}
}
void stop() {
LogStringStream log_ss;
log_ss << __PRETTY_FUNCTION__;
std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
log_ss << ", terminate: " << shell_process_.terminate();
cur_voice_path = "";
}
private:
std::mutex shell_process_mutex_;
ShellProcess shell_process_;
std::string cur_voice_path;
};