hthreadpool.h 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #ifndef H_THREAD_POOL_H
  2. #define H_THREAD_POOL_H
  3. #include <vector>
  4. #include <thread>
  5. #include <queue>
  6. #include <functional>
  7. #include <atomic>
  8. #include <mutex>
  9. #include <condition_variable>
  10. #include <future>
  11. #include "hlog.h"
  12. #include "hthread.h"
  13. class HThreadPool{
  14. public:
  15. using Task = std::function<void()>;
  16. HThreadPool(int size = std::thread::hardware_concurrency()) : pool_size(size), idle_num(size), status(STOP){
  17. }
  18. ~HThreadPool(){
  19. stop();
  20. }
  21. int start() {
  22. if (status == STOP) {
  23. status = RUNNING;
  24. for (int i = 0; i < pool_size; ++i){
  25. workers.emplace_back(std::thread([this]{
  26. hlogd("work thread[%X] running...", gettid());
  27. while (status != STOP){
  28. while (status == PAUSE){
  29. std::this_thread::yield();
  30. }
  31. Task task;
  32. {
  33. std::unique_lock<std::mutex> locker(mutex);
  34. cond.wait(locker, [this]{
  35. return status == STOP || !tasks.empty();
  36. });
  37. if (status == STOP) return;
  38. if (!tasks.empty()){
  39. task = std::move(tasks.front());
  40. tasks.pop();
  41. }
  42. }
  43. --idle_num;
  44. task();
  45. ++idle_num;
  46. }
  47. }));
  48. }
  49. }
  50. return 0;
  51. }
  52. int stop() {
  53. if (status != STOP) {
  54. status = STOP;
  55. cond.notify_all();
  56. for (auto& thread: workers){
  57. thread.join();
  58. }
  59. }
  60. return 0;
  61. }
  62. int pause() {
  63. if (status == RUNNING) {
  64. status = PAUSE;
  65. }
  66. return 0;
  67. }
  68. int resume() {
  69. if (status == PAUSE) {
  70. status = RUNNING;
  71. }
  72. return 0;
  73. }
  74. // return a future, calling future.get() will wait task done and return RetType.
  75. // commit(fn, args...)
  76. // commit(std::bind(&Class::mem_fn, &obj))
  77. // commit(std::mem_fn(&Class::mem_fn, &obj))
  78. template<class Fn, class... Args>
  79. auto commit(Fn&& fn, Args&&... args) -> std::future<decltype(fn(args...))>{
  80. using RetType = decltype(fn(args...));
  81. auto task = std::make_shared<std::packaged_task<RetType()> >(
  82. std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...)
  83. );
  84. std::future<RetType> future = task->get_future();
  85. {
  86. std::lock_guard<std::mutex> locker(mutex);
  87. tasks.emplace([task]{
  88. (*task)();
  89. });
  90. }
  91. cond.notify_one();
  92. return future;
  93. }
  94. public:
  95. int pool_size;
  96. std::atomic<int> idle_num;
  97. enum Status {
  98. STOP,
  99. RUNNING,
  100. PAUSE,
  101. };
  102. std::atomic<Status> status;
  103. std::vector<std::thread> workers;
  104. std::queue<Task> tasks;
  105. std::mutex mutex;
  106. std::condition_variable cond;
  107. };
  108. #endif // H_THREAD_POOL_H