#ifndef HV_THREAD_POOL_H_ #define HV_THREAD_POOL_H_ #include #include #include #include #include #include #include #include #include #include class HThreadPool { public: using Task = std::function; HThreadPool(int size = std::thread::hardware_concurrency()) : pool_size(size), idle_num(size), status(STOP) { } ~HThreadPool() { stop(); } int start() { if (status == STOP) { status = RUNNING; for (int i = 0; i < pool_size; ++i) { workers.emplace_back(std::thread([this]{ while (status != STOP) { while (status == PAUSE) { std::this_thread::yield(); } Task task; { std::unique_lock locker(_mutex); _cond.wait(locker, [this]{ return status == STOP || !tasks.empty(); }); if (status == STOP) return; if (!tasks.empty()) { --idle_num; task = std::move(tasks.front()); tasks.pop(); } } task(); ++idle_num; } })); } } return 0; } int stop() { if (status != STOP) { status = STOP; _cond.notify_all(); for (auto& worker : workers) { worker.join(); } } return 0; } int pause() { if (status == RUNNING) { status = PAUSE; } return 0; } int resume() { if (status == PAUSE) { status = RUNNING; } return 0; } int wait() { while (1) { if (status == STOP || (tasks.empty() && idle_num == pool_size)) { break; } std::this_thread::yield(); } return 0; } // return a future, calling future.get() will wait task done and return RetType. // commit(fn, args...) // commit(std::bind(&Class::mem_fn, &obj)) // commit(std::mem_fn(&Class::mem_fn, &obj)) template auto commit(Fn&& fn, Args&&... args) -> std::future { using RetType = decltype(fn(args...)); auto task = std::make_shared >( std::bind(std::forward(fn), std::forward(args)...)); std::future future = task->get_future(); { std::lock_guard locker(_mutex); tasks.emplace([task]{ (*task)(); }); } _cond.notify_one(); return future; } public: enum Status { STOP, RUNNING, PAUSE, }; int pool_size; std::atomic idle_num; std::atomic status; std::vector workers; std::queue tasks; protected: std::mutex _mutex; std::condition_variable _cond; }; #endif // HV_THREAD_POOL_H_