从音频播放C++类的实现学习Linux系统子进程管理

为了实现一个可以及时停止的音频播放类,学习使用fork、execl、setpgid、waitpid和进程组概念。

之前车端软件播放音频文件都是在代码中使用popen、system等库函数执行Shell命令aplay播放完整的短音频,不需要被打断。而这次的需求需要声音被及时打断,使用popen等库函数皆不能优雅的提前终止,所以有了此次造轮子的行为。其实boost库中有process模块,也能提前终止进程,但是看其示例感觉比较复杂容易使用不当,其中spawn、group等用法让人不看源代码、不学习底层原理,就不敢轻易使用。权衡了一下,总之都要学习一下进程管理,不如自己造个轮子,也能实操加深理解。

代码的地址在这里

在遇到另一篇文章中问题诊断: 小手册系列「1」的“popen后台符号&导致僵尸进程”问题时,我就观察到用popen执行非内建命令时会至少两个进程,一个是/bin/sh,另一个才是输入命令的进程,并且如果执行kill命令杀死/bin/sh进程并不会导致输入命令的进程退出。原因是popen内部先创建Shell进程将用户命令作为参数传给Shell进程执行,Shell进程会创建子进程执行实际的命令,而kill父进程的pid不会将信号发给子进程。

对于本需求需要主动结束播放音频进程,如果使用popen则难以获取播放音频进程的pid;如果使用forkexec*系统调用创建子进程可以拿到子进程id,当然可以直接exec*创建播放音频进程并且拿到其pid,通过kill函数可以提前终止。而播放音频命令aplay不支持循环播放,这种做法需要在外部进行循环调用并不优雅,而且假设播放音频命令也创建子进程,kill命令不能结束子进程,可能会造成资源的泄漏。所以打算和popen一样执行Shell命令,可以使用while等命令实现循环调用,使用进程组就可以将结束信号发给Shell进程和其子进程

下面主要介绍下进程组概念,进程组和会话主要用于Shell作业控制。

进程组由一个或多个共享同一进程组标识符(PGID)的进程组成,进程组标识符(PGID)等于首次创建进程组的进程pid,其生命周期为创建时刻开始,持续到最后一个进程退出进程组,在其生命周期不会有和PGID相同的pid分配给新进程。子进程会继承父进程的进程组

会话则是一组进程组的集合。进程的会话成员关系是由其会话标识符(SID)确定的,会话标识符也是会话创建进程的pid。下图来自1,表示了进程组和会话之间的关系。

进程组、会话和控制终端之间的关系进程组
进程组、会话和控制终端之间的关系进程组

上图可以看到像sort < file | uniq -c带有管道的命令会创建两个进程且在同一进程组,这样我们在前台执行ctrl+c时的终端会通过killpg将信号发送给进程组中的每个进程,从而结束这条命令。利用这个特点,我们可以fork后的子进程设置其为进程组的首进程,在父进程就可以通过killpg发送信号,杀死子进程和其产生的子进程,从而达到目的。

c++

pid_t exec(const std::string &cmd) {
    LogStringStream log_ss;
    log_ss << __PRETTY_FUNCTION__ << ", cmd: " << cmd;
    if (cmd.empty()) return -1;
    pid_t pid = fork();
    if (pid == 0) {
        ::setpgid(0, 0);
        ::execl("/bin/sh", "sh", "-c", cmd.c_str(), nullptr);
        LOG_ERROR << "execl failure!";
        _exit(EXIT_FAILURE);
    } else if (pid < 0) {
        LOG_ERROR << "fork failed: " << pid;
    } else {
        log_ss << ", pid: " << pid << ", setpgid ret: " << ::setpgid(pid, pid);
    }
    return pid;
}

核心代码如上所示,其中setpgid是用来将进程为pid的进程的进程组修改为pgid,定义如下:

c

#include <unistd.h>

int setpgid(pid_t pid, pid_t pgid);

如果将pid的值设置为0,那么就会设置调用进程的进程组。如果将pgid的值设置为0,那么设置的进程组pgid等于调用进程的pid。所以在fork后的子进程中可以直接调用::setpgid(0, 0);将自己设置为进程组的首进程,即其pid==pgid,从而使得其创建的子进程都继承其进程组id。父进程即可通过killpg(pid, sig)向/bin/sh进程和其产生的子进程发送sig信号。一个进程在其子进程已经执行exec()后就无法修改该子进程的进程组pgid了。

需要特别注意的是在父进程也调用了::setpgid(pid, pid),重复设置子进程的进程组id。这是进程安全的做法,由于fork后子进程和父进程执行顺序不确定,如果只在子进程设置,可能出现子进程还没设置其线程组,而父进程就对线程组调用诸如killpg等命令造成并发问题。所以在父进程调用是很有必要的。

-c选项Shell会直接执行该命令,而不是创建一个子Shell进程。

在循环播放音频时,如果父进程崩溃,并不会影响到子进程继续执行,此时声音就会一直被播放,这是不能被接受的。所以考虑在循环执行播放命令时做判断其父进程是否存在。

我的实现如下,通过持续判断当前进程的父进程(/proc/$$/stat文件中动态更新)是否等于进程创建时的父进程id($PPID)来判断父进程是否活着,如果父进程崩溃了,该子进程会被系统进程收养,其父进程id会改变。之所以没有直接判断$PPID进程是否还存在,是因为$PPID进程号可能被新启动的进程分配到,这种情况下主进程虽然已经崩溃,但是$PPID却还存在。

shell

while [ $PPID -eq $(cut -d ' ' -f 4 /proc/$$/stat) ]; do 
    aplay voice_path
done

对上述进程管理操作,我封装了一个ShellProcess类,用于管理子进程,其只有一个数据成员pid_保存子进程的pid。同时封装VoicePlayer使用ShellProcess管理子进程播放音频。

在实现阻塞播放一次音频时,发现了一个难点:首先对于shell_process_对象的调用需要加锁保护防止多线程竞争,但是我不希望在阻塞播放音频时全程持有这个锁,这样就不能在其它线程调用stop(其也需要加锁)提前终止这次播放。

我的实现是在加锁的状态下进行exec播放进程并保存其pid到栈上。然后释放锁再调用waitpid,此时其它线程就可以随时stop了。之后在waitpid返回子进程结束后在, 再申请锁将shell_process_对象持有的pid赋值为-1,表明子进程已经结束。当然为了防止竟态条件,只在该对象持有的pid==保存的pid时才进行操作。

c++

void playOnceBlock(const std::string &voice_path) {
    LogStringStream log_ss;
    log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
    if (check_file_exist(voice_path)) {
        pid_t pid_voice = -1;
        {
            std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
            shell_process_ = ShellProcess("aplay " + voice_path);
            pid_voice = shell_process_.pid();
            log_ss << ", pid: " << pid_voice;
            cur_voice_path = voice_path;
        }
        log_ss.flush_log();
        if (pid_voice > 0) {
            int status = -1;
            int waitpid_ret = ::waitpid(pid_voice, &status, 0);  // waitpid阻塞时不持有锁, 其它线程可调用stop提前结束
            log_ss << "waitpid, status: " << status << ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
                    << ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
        }
        {
            std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
            log_ss << "cur_pid: " << shell_process_.pid() << ", pid_voice: " << pid_voice;
            if (shell_process_.pid() == pid_voice) log_ss << ", terminate: " << shell_process_.terminate();
            cur_voice_path = "";
        }
    } else {
        log_ss << ", not exist";
    }
}

update on 2024.06.25.

c++

#define LOG_ERROR std::cerr << std::endl
#define LOG_INFO std::cout << std::endl

bool check_file_exist(const std::string &file_path) {
    std::ifstream file(file_path);
    return file.good();
}

class LogStringStream : public std::ostringstream {
public:
    ~LogStringStream() {
        auto log_str = this->str();
        if (log_str.size()) {
            LOG_INFO << log_str;
        }
    }

    void flush_log() {
        auto log_str = this->str();
        if (log_str.size()) {
            LOG_INFO << log_str;
            this->str("");
        }
    }
};
class DeferFunction {
public:
    explicit DeferFunction(const std::function<void()> &defer_func) : defer_func_(defer_func) {}

    ~DeferFunction() { defer_func_(); }

private:
    std::function<void()> defer_func_;
};

class ShellProcess {
public:
    ShellProcess() = default;

    ShellProcess(const std::string &cmd) : pid_(exec(cmd)) {}

    ShellProcess(ShellProcess &&lhs) noexcept : pid_(lhs.pid_) { lhs.pid_ = -1; }

    ShellProcess &operator=(ShellProcess &&lhs) noexcept {
        if (this != &lhs) {
            terminate();
            this->pid_ = lhs.pid_;
            lhs.pid_ = -1;
        }
        return *this;
    }

    ShellProcess(const ShellProcess &) = delete;
    ShellProcess &operator=(const ShellProcess &) = delete;

    int terminate() {
        if (pid_ > 0) {
            LogStringStream log_ss;
            log_ss << __PRETTY_FUNCTION__ << ", pid: " << pid_;
            int killpg_ret = ::killpg(pid_, SIGKILL);
            log_ss << ", killpg_ret: " << killpg_ret;
            std::this_thread::yield();
            if (killpg_ret < 0) {
                log_ss << ", error: " << strerror(errno);
            }
            return waitNoBlock();
        }
        return -1;
    }

    int wait() {
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__ << ", pid: " << pid_;
        if (pid_ > 0) {
            int status = -1;
            int waitpid_ret = ::waitpid(pid_, &status, 0);
            pid_ = -1;
            log_ss << ", status: " << status << ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
                   << ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
            if (waitpid_ret < 0) {
                log_ss << ", error: " << strerror(errno);
                return -1;
            }
            return status;
        }
        return -1;
    }

    ~ShellProcess() { terminate(); }

    pid_t pid() const { return pid_; };

private:
    pid_t exec(const std::string &cmd) {
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__ << ", cmd: " << cmd;
        if (cmd.empty()) return -1;
        pid_t pid = fork();
        if (pid == 0) {
            ::setpgid(0, 0);
            ::execl("/bin/sh", "sh", "-c", cmd.c_str(), nullptr);
            LOG_ERROR << "execl failure!";
            _exit(EXIT_FAILURE);
        } else if (pid < 0) {
            LOG_ERROR << "fork failed: " << pid;
        } else {
            log_ss << ", pid: " << pid << ", setpgid ret: " << ::setpgid(pid, pid);
        }
        return pid;
    }

    int waitNoBlock() {
        DeferFunction defer_func([this] { this->pid_ = -1; });
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__ << ", pid: " << pid_;
        if (pid_ > 0) {
            static const int max_retry_times = 15;
            int status = -1;
            int waitpid_ret = 0;
            int retry_times = max_retry_times;
            do {
                waitpid_ret = ::waitpid(pid_, &status, WNOHANG);
                if (max_retry_times - retry_times < 5)
                    std::this_thread::sleep_for(std::chrono::milliseconds(10));
                else
                    std::this_thread::yield();

            } while (--retry_times > 0 && ((waitpid_ret == 0) || (waitpid_ret == -1 && errno == EINTR) ||
                                           (waitpid_ret != -1 && !WIFEXITED(status) && !WIFSIGNALED(status))));
            log_ss << ", retry_times: " << retry_times + 1 << ", status: " << status
                   << ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
                   << ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
            return status;
        }
        return -1;
    }

    pid_t pid_ = -1;
};

class VoicePlayer {
public:
    void playContinuous(const std::string &voice_path) {
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
        if (check_file_exist(voice_path)) {
            std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
            shell_process_ = ShellProcess("while [ $PPID -eq $(cut -d ' ' -f 4 /proc/$$/stat) ]; do aplay " + voice_path + "; done");
            log_ss << ", pid: " << shell_process_.pid();
            cur_voice_path = voice_path;
        } else {
            log_ss << ", not exist";
        }
    }

    void playOnce(const std::string &voice_path) {
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
        if (check_file_exist(voice_path)) {
            std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
            shell_process_ = ShellProcess("aplay " + voice_path);
            log_ss << ", pid: " << shell_process_.pid();
            cur_voice_path = voice_path;
        } else {
            log_ss << ", not exist";
        }
    }

    void playOnceBlock(const std::string &voice_path) {
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__ << ", path: " << voice_path;
        if (check_file_exist(voice_path)) {
            pid_t pid_voice = -1;
            {
                std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
                shell_process_ = ShellProcess("aplay " + voice_path);
                pid_voice = shell_process_.pid();
                log_ss << ", pid: " << pid_voice;
                cur_voice_path = voice_path;
            }
            log_ss.flush_log();
            if (pid_voice > 0) {
                int status = -1;
                int waitpid_ret = ::waitpid(pid_voice, &status, 0);  // waitpid阻塞时不持有锁, 其它线程可调用stop提前结束
                log_ss << "waitpid, status: " << status << ", exited: " << (WIFEXITED(status) ? WEXITSTATUS(status) : -1)
                       << ", signal: " << (WIFSIGNALED(status) ? WTERMSIG(status) : -1) << ", waitpid_ret: " << waitpid_ret;
            }
            {
                std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
                log_ss << "cur_pid: " << shell_process_.pid() << ", pid_voice: " << pid_voice;
                if (shell_process_.pid() == pid_voice) log_ss << ", terminate: " << shell_process_.terminate();
                cur_voice_path = "";
            }
        } else {
            log_ss << ", not exist";
        }
    }

    void stop(const std::string &voice_path) {
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__ << ", voice_path: " << voice_path;
        std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
        log_ss << ", cur_voice: " << cur_voice_path;
        if (voice_path == cur_voice_path) {
            log_ss << ", terminate: " << shell_process_.terminate();
            cur_voice_path = "";
        }
    }

    void stop() {
        LogStringStream log_ss;
        log_ss << __PRETTY_FUNCTION__;
        std::lock_guard<std::mutex> lk_shell_process(shell_process_mutex_);
        log_ss << ", terminate: " << shell_process_.terminate();
        cur_voice_path = "";
    }

private:
    std::mutex shell_process_mutex_;
    ShellProcess shell_process_;
    std::string cur_voice_path;
};