#ifndef HW_THREAD_POOL_H_ #define HW_THREAD_POOL_H_ #include #include #include #include #include #include #include #include #include #include #include "hlog.h" #include "hthread.h" class HThreadPool { public: using Task = std::function; HThreadPool(int size = std::thread::hardware_concurrency()) : pool_size(size), idle_num(size), status(STOP) { } ~HThreadPool() { stop(); } int start() { if (status == STOP) { status = RUNNING; for (int i = 0; i < pool_size; ++i) { workers.emplace_back(std::thread([this]{ hlogd("work thread[%X] running...", gettid()); while (status != STOP) { while (status == PAUSE) { std::this_thread::yield(); } Task task; { std::unique_lock locker(mutex); cond.wait(locker, [this]{ return status == STOP || !tasks.empty(); }); if (status == STOP) return; if (!tasks.empty()) { task = std::move(tasks.front()); tasks.pop(); } } --idle_num; task(); ++idle_num; } })); } } return 0; } int stop() { if (status != STOP) { status = STOP; cond.notify_all(); for (auto& thread : workers) { thread.join(); } } return 0; } int pause() { if (status == RUNNING) { status = PAUSE; } return 0; } int resume() { if (status == PAUSE) { status = RUNNING; } return 0; } // return a future, calling future.get() will wait task done and return RetType. // commit(fn, args...) // commit(std::bind(&Class::mem_fn, &obj)) // commit(std::mem_fn(&Class::mem_fn, &obj)) template auto commit(Fn&& fn, Args&&... args) -> std::future { using RetType = decltype(fn(args...)); auto task = std::make_shared >( std::bind(std::forward(fn), std::forward(args)...)); std::future future = task->get_future(); { std::lock_guard locker(mutex); tasks.emplace([task]{ (*task)(); }); } cond.notify_one(); return future; } public: int pool_size; std::atomic idle_num; enum Status { STOP, RUNNING, PAUSE, }; std::atomic status; std::vector workers; std::queue tasks; std::mutex mutex; std::condition_variable cond; }; #endif // HW_THREAD_POOL_H_