#ifndef H_THREAD_POOL_H #define H_THREAD_POOL_H #include #include #include #include #include #include #include #include #include "hlog.h" #include "hthread.h" class HThreadPool{ public: using Task = std::function; HThreadPool(int size = std::thread::hardware_concurrency()) : pool_size(size), idle_num(size), status(STOP){ } ~HThreadPool(){ stop(); } int start() { if (status == STOP) { status = RUNNING; for (int i = 0; i < pool_size; ++i){ workers.emplace_back(std::thread([this]{ hlogd("work thread[%X] running...", gettid()); while (status != STOP){ while (status == PAUSE){ std::this_thread::yield(); } Task task; { std::unique_lock locker(mutex); cond.wait(locker, [this]{ return status == STOP || !tasks.empty(); }); if (status == STOP) return; if (!tasks.empty()){ task = std::move(tasks.front()); tasks.pop(); } } --idle_num; task(); ++idle_num; } })); } } return 0; } int stop() { if (status != STOP) { status = STOP; cond.notify_all(); for (auto& thread: workers){ thread.join(); } } return 0; } int pause() { if (status == RUNNING) { status = PAUSE; } return 0; } int resume() { if (status == PAUSE) { status = RUNNING; } return 0; } // return a future, calling future.get() will wait task done and return RetType. // commit(fn, args...) // commit(std::bind(&Class::mem_fn, &obj)) // commit(std::mem_fn(&Class::mem_fn, &obj)) template auto commit(Fn&& fn, Args&&... args) -> std::future{ using RetType = decltype(fn(args...)); auto task = std::make_shared >( std::bind(std::forward(fn), std::forward(args)...) ); std::future future = task->get_future(); { std::lock_guard locker(mutex); tasks.emplace([task]{ (*task)(); }); } cond.notify_one(); return future; } public: int pool_size; std::atomic idle_num; enum Status { STOP, RUNNING, PAUSE, }; std::atomic status; std::vector workers; std::queue tasks; std::mutex mutex; std::condition_variable cond; }; #endif // H_THREAD_POOL_H