hthreadpool.h 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. #ifndef HV_THREAD_POOL_H_
  2. #define HV_THREAD_POOL_H_
  3. #include <vector>
  4. #include <thread>
  5. #include <queue>
  6. #include <functional>
  7. #include <atomic>
  8. #include <mutex>
  9. #include <condition_variable>
  10. #include <future>
  11. #include <memory>
  12. #include <utility>
  13. //#include "hlog.h"
  14. #include "hthread.h"
  15. class HThreadPool {
  16. public:
  17. using Task = std::function<void()>;
  18. HThreadPool(int size = std::thread::hardware_concurrency())
  19. : pool_size(size), idle_num(size), status(STOP) {
  20. }
  21. ~HThreadPool() {
  22. stop();
  23. }
  24. int start() {
  25. if (status == STOP) {
  26. status = RUNNING;
  27. for (int i = 0; i < pool_size; ++i) {
  28. workers.emplace_back(std::thread([this]{
  29. //hlogd("work thread[%X] running...", gettid());
  30. while (status != STOP) {
  31. while (status == PAUSE) {
  32. std::this_thread::yield();
  33. }
  34. Task task;
  35. {
  36. std::unique_lock<std::mutex> locker(_mutex);
  37. _cond.wait(locker, [this]{
  38. return status == STOP || !tasks.empty();
  39. });
  40. if (status == STOP) return;
  41. if (!tasks.empty()) {
  42. --idle_num;
  43. task = std::move(tasks.front());
  44. tasks.pop();
  45. }
  46. }
  47. task();
  48. ++idle_num;
  49. }
  50. }));
  51. }
  52. }
  53. return 0;
  54. }
  55. int stop() {
  56. if (status != STOP) {
  57. status = STOP;
  58. _cond.notify_all();
  59. for (auto& worker : workers) {
  60. worker.join();
  61. }
  62. }
  63. return 0;
  64. }
  65. int pause() {
  66. if (status == RUNNING) {
  67. status = PAUSE;
  68. }
  69. return 0;
  70. }
  71. int resume() {
  72. if (status == PAUSE) {
  73. status = RUNNING;
  74. }
  75. return 0;
  76. }
  77. int wait() {
  78. while (1) {
  79. if (status == STOP || (tasks.empty() && idle_num == pool_size)) {
  80. break;
  81. }
  82. std::this_thread::yield();
  83. }
  84. return 0;
  85. }
  86. // return a future, calling future.get() will wait task done and return RetType.
  87. // commit(fn, args...)
  88. // commit(std::bind(&Class::mem_fn, &obj))
  89. // commit(std::mem_fn(&Class::mem_fn, &obj))
  90. template<class Fn, class... Args>
  91. auto commit(Fn&& fn, Args&&... args) -> std::future<decltype(fn(args...))> {
  92. using RetType = decltype(fn(args...));
  93. auto task = std::make_shared<std::packaged_task<RetType()> >(
  94. std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...));
  95. std::future<RetType> future = task->get_future();
  96. {
  97. std::lock_guard<std::mutex> locker(_mutex);
  98. tasks.emplace([task]{
  99. (*task)();
  100. });
  101. }
  102. _cond.notify_one();
  103. return future;
  104. }
  105. public:
  106. enum Status {
  107. STOP,
  108. RUNNING,
  109. PAUSE,
  110. };
  111. int pool_size;
  112. std::atomic<int> idle_num;
  113. std::atomic<Status> status;
  114. std::vector<std::thread> workers;
  115. std::queue<Task> tasks;
  116. protected:
  117. std::mutex _mutex;
  118. std::condition_variable _cond;
  119. };
  120. #endif // HV_THREAD_POOL_H_