hthreadpool.h 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #ifndef HV_THREAD_POOL_H_
  2. #define HV_THREAD_POOL_H_
  3. #include <vector>
  4. #include <thread>
  5. #include <queue>
  6. #include <functional>
  7. #include <atomic>
  8. #include <mutex>
  9. #include <condition_variable>
  10. #include <future>
  11. #include <memory>
  12. #include <utility>
  13. //#include "hlog.h"
  14. #include "hthread.h"
  15. class HThreadPool {
  16. public:
  17. using Task = std::function<void()>;
  18. HThreadPool(int size = std::thread::hardware_concurrency())
  19. : pool_size(size), idle_num(size), status(STOP) {
  20. }
  21. ~HThreadPool() {
  22. stop();
  23. }
  24. int start() {
  25. if (status == STOP) {
  26. status = RUNNING;
  27. for (int i = 0; i < pool_size; ++i) {
  28. workers.emplace_back(std::thread([this]{
  29. while (status != STOP) {
  30. while (status == PAUSE) {
  31. std::this_thread::yield();
  32. }
  33. Task task;
  34. {
  35. std::unique_lock<std::mutex> locker(_mutex);
  36. _cond.wait(locker, [this]{
  37. return status == STOP || !tasks.empty();
  38. });
  39. if (status == STOP) return;
  40. if (!tasks.empty()) {
  41. --idle_num;
  42. task = std::move(tasks.front());
  43. tasks.pop();
  44. }
  45. }
  46. task();
  47. ++idle_num;
  48. }
  49. }));
  50. }
  51. }
  52. return 0;
  53. }
  54. int stop() {
  55. if (status != STOP) {
  56. status = STOP;
  57. _cond.notify_all();
  58. for (auto& worker : workers) {
  59. worker.join();
  60. }
  61. }
  62. return 0;
  63. }
  64. int pause() {
  65. if (status == RUNNING) {
  66. status = PAUSE;
  67. }
  68. return 0;
  69. }
  70. int resume() {
  71. if (status == PAUSE) {
  72. status = RUNNING;
  73. }
  74. return 0;
  75. }
  76. int wait() {
  77. while (1) {
  78. if (status == STOP || (tasks.empty() && idle_num == pool_size)) {
  79. break;
  80. }
  81. std::this_thread::yield();
  82. }
  83. return 0;
  84. }
  85. // return a future, calling future.get() will wait task done and return RetType.
  86. // commit(fn, args...)
  87. // commit(std::bind(&Class::mem_fn, &obj))
  88. // commit(std::mem_fn(&Class::mem_fn, &obj))
  89. template<class Fn, class... Args>
  90. auto commit(Fn&& fn, Args&&... args) -> std::future<decltype(fn(args...))> {
  91. using RetType = decltype(fn(args...));
  92. auto task = std::make_shared<std::packaged_task<RetType()> >(
  93. std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...));
  94. std::future<RetType> future = task->get_future();
  95. {
  96. std::lock_guard<std::mutex> locker(_mutex);
  97. tasks.emplace([task]{
  98. (*task)();
  99. });
  100. }
  101. _cond.notify_one();
  102. return future;
  103. }
  104. public:
  105. enum Status {
  106. STOP,
  107. RUNNING,
  108. PAUSE,
  109. };
  110. int pool_size;
  111. std::atomic<int> idle_num;
  112. std::atomic<Status> status;
  113. std::vector<std::thread> workers;
  114. std::queue<Task> tasks;
  115. protected:
  116. std::mutex _mutex;
  117. std::condition_variable _cond;
  118. };
  119. #endif // HV_THREAD_POOL_H_