wsz 发表于 2025-2-5 18:24:40

HiJobQueue:一个简单的线程安全任务队列

HiJobQueue:一个简单的线程安全任务队列

概述

HiJobQueue 是一个线程安全的任务队列,用于在多线程环境中管理和执行异步任务。它的设计参考了 Cobalt 项目中的 JobQueue,并做了适当的简化。HiJobQueue 提供了任务推送(push)、任务弹出(pop)、队列退出(quit)等功能,适用于需要异步任务调度的场景。
<hr>核心功能


[*]线程安全:

[*]使用 std::mutex 和 std::condition_variable 实现线程安全的任务队列。

[*]任务调度:

[*]支持任务的异步推送和弹出。

[*]退出机制:

[*]提供 quit() 方法,用于安全地停止任务队列。

[*]跨平台:

[*]使用 C++ 标准库实现,不依赖平台特定的 API。

<hr>实现代码

以下是 HiJobQueue 的实现代码:
#pragma once#include <mutex>#include <functional>#include <queue>#include <condition_variable>/** * @brief 线程安全的任务队列,用于管理和执行异步任务。 */class HiJobQueue final {public:    using Job = std::function<void()>; // 任务类型public:    HiJobQueue() : is_exit_(false) {}    /**   * @brief 推送任务到队列。   * @param job 要执行的任务。   * @return 如果队列已退出,返回 false;否则返回 true。   */    bool push(Job job);    /**   * @brief 从队列中弹出任务。   * @param job 用于存储弹出的任务。   * @return 如果队列为空且已退出,返回 false;否则返回 true。   */    bool pop(Job& job);    /**   * @brief 获取队列中的任务数量。   * @return 队列中的任务数量。   */    size_t size();    /**   * @brief 退出队列,停止任务处理。   */    void quit();    /**   * @brief 检查队列是否已退出。   * @return 如果队列已退出,返回 true;否则返回 false。   */    bool is_quited();    // 禁用拷贝构造函数和赋值运算符    HiJobQueue(HiJobQueue&) = delete;    HiJobQueue(const HiJobQueue&) = delete;private:    bool is_exit_;               // 队列退出标志    std::mutex mutex_;             // 互斥锁,保护队列访问    std::condition_variable cond_; // 条件变量,用于任务通知    std::queue<Job> queue_;      // 任务队列};// 实现bool HiJobQueue::push(Job job) {    std::lock_guard<std::mutex> locker(mutex_);    if (is_exit_) {      return false;    }    queue_.push(std::move(job));    cond_.notify_one();    return true;}bool HiJobQueue::pop(Job& job) {    std::unique_lock<std::mutex> locker(mutex_);    cond_.wait(locker, () { return is_exit_ || !queue_.empty(); });    if (is_exit_ && queue_.empty()) {      return false;    }    job = std::move(queue_.front());    queue_.pop();    return true;}size_t HiJobQueue::size() {    std::lock_guard<std::mutex> locker(mutex_);    return queue_.size();}void HiJobQueue::quit() {    std::lock_guard<std::mutex> locker(mutex_);    is_exit_ = true;    cond_.notify_all();}bool HiJobQueue::is_quited() {    std::lock_guard<std::mutex> locker(mutex_);    return is_exit_;}<hr>测试用例

为了验证 HiJobQueue 的正确性和线程安全性,我们设计了以下测试用例:
测试代码

#include <gtest/gtest.h>#include <future>#include <atomic>#include <thread>#include <chrono>#include "hi_job_queue.h"class TestCls {public:    void test(const char* text, int i) {      printf("%s-%d\n", text, i);    }};TEST(HiJobQueueTest, ConcurrentPushPop) {    HiJobQueue queue;    TestCls cls;    std::atomic<int> job_count{0};   auto f1 = std::async(std::launch::async, [&] {      HiJobQueue::Job job;      while (queue.pop(job)) {            job();            job_count++;      }    });    auto f2 = std::async(std::launch::async, [&] {      HiJobQueue::Job job;      while (queue.pop(job)) {            job();            job_count++;      }    });    auto f3 = std::async(std::launch::async, [&] {      for (int i = 0; i < 200; i++) {            queue.push(std::bind(&TestCls::test, &cls, "test1", i));            std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 跨平台休眠      }    });    auto f4 = std::async(std::launch::async, [&] {      for (int i = 0; i < 200; i++) {            queue.push(std::bind(&TestCls::test, &cls, "test2", i));            std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 跨平台休眠      }    });    f3.wait();    f4.wait();    queue.quit();    f1.wait();    f2.wait();    // 验证所有任务被执行    EXPECT_EQ(job_count.load(), 400); // 200 (test1) + 200 (test2)}TEST(HiJobQueueTest, QuitBehavior) {    HiJobQueue queue;    auto consumer = std::async(std::launch::async, [&] {      HiJobQueue::Job job;      while (queue.pop(job)) {            job();      }    });    for (int i = 0; i < 10; i++) {      queue.push([]() {});    }    queue.quit();    consumer.wait();    EXPECT_TRUE(queue.is_quited());    EXPECT_FALSE(queue.push([]() {}));}TEST(HiJobQueueTest, EmptyQueueBehavior) {    HiJobQueue queue;    HiJobQueue::Job job;    bool pop_result = false;    auto pop_thread = std::async(std::launch::async, [&] {      pop_result = queue.pop(job);    });    std::this_thread::sleep_for(std::chrono::milliseconds(100));    EXPECT_FALSE(pop_result);    queue.quit();    pop_thread.wait();    EXPECT_FALSE(pop_result);    EXPECT_FALSE(queue.pop(job));}<hr>测试用例说明


[*]ConcurrentPushPop:

[*]测试多线程环境下 push 和 pop 的并发行为。
[*]验证所有任务是否被正确执行。

[*]QuitBehavior:

[*]测试队列退出时的行为。
[*]验证退出后是否不再接受新任务。

[*]EmptyQueueBehavior:

[*]测试队列为空时的行为。
[*]验证退出后 pop 的行为。

<hr>适用场景

HiJobQueue 适用于以下场景:

[*]多线程任务调度:

[*]在需要将任务分发到多个工作线程执行的场景中,HiJobQueue 可以作为任务调度器使用。
[*]例如:线程池中的任务队列。

[*]事件驱动架构:

[*]在事件驱动的系统中,HiJobQueue 可以用于存储和处理事件。
[*]例如:GUI 应用中的事件队列。

[*]异步任务处理:

[*]在需要异步执行任务的场景中,HiJobQueue 可以用于存储任务并由后台线程处理。
[*]例如:日志系统的异步写入。

[*]生产者-消费者模型:

[*]在生产者-消费者模型中,HiJobQueue 可以作为共享的任务缓冲区。
[*]例如:多线程下载任务的分发。

<hr>优缺点分析

优点


[*]线程安全:

[*]使用 std::mutex 和 std::condition_variable 确保多线程环境下的安全性。

[*]简单易用:

[*]提供了简洁的接口(push、pop、quit),易于集成到现有项目中。

[*]跨平台:

[*]基于 C++ 标准库实现,不依赖平台特定的 API,具有良好的可移植性。

[*]退出机制:

[*]提供 quit() 方法,可以安全地停止任务队列,避免资源泄漏。

[*]轻量级:

[*]代码简洁,性能开销小,适合对性能要求较高的场景。

<hr>缺点


[*]功能单一:

[*]仅支持基本的任务队列功能,不支持优先级调度或任务取消。

[*]性能瓶颈:

[*]在高并发场景下,std::mutex 可能成为性能瓶颈。
[*]如果需要更高的性能,可以考虑无锁队列(如 boost::lockfree::queue)。

[*]任务类型限制:

[*]任务类型为 std::function<void()>,不支持返回值或参数传递。
[*]如果需要更复杂的任务类型,需要自行扩展。

[*]缺乏任务状态管理:

[*]不支持任务的状态管理(如任务完成通知或错误处理)。

[*]退出时未执行任务:

[*]在调用 quit() 方法退出队列时,如果仍有未执行的任务,这些任务会被直接丢弃。
[*]如果任务中存在阻塞操作(如等待 I/O、锁、条件变量等),可能导致进程无法及时退出。
例如:在希望进程退出时,如果任务队列中有未执行的任务,且任务中存在阻塞操作,进程可能会卡住,无法正常退出。

<hr>总结

HiJobQueue 是一个简单但功能强大的线程安全任务队列,适用于多线程环境中的异步任务调度。通过参考 Cobalt 项目中的 JobQueue,我们实现了一个更轻量级的版本,并通过单元测试验证了其正确性和线程安全性。希望这篇文章能帮助你理解和使用 HiJobQueue。
页: [1]
查看完整版本: HiJobQueue:一个简单的线程安全任务队列