EventLoopThreadPool.h 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. #ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_
  2. #define HV_EVENT_LOOP_THREAD_POOL_HPP_
  3. #include "EventLoopThread.h"
  4. namespace hv {
  5. class EventLoopThreadPool : public Status {
  6. public:
  7. EventLoopThreadPool(int thread_num = std::thread::hardware_concurrency()) {
  8. setStatus(kInitializing);
  9. thread_num_ = thread_num;
  10. next_loop_idx_ = 0;
  11. setStatus(kInitialized);
  12. }
  13. ~EventLoopThreadPool() {
  14. stop();
  15. join();
  16. }
  17. int threadNum() {
  18. return thread_num_;
  19. }
  20. void setThreadNum(int num) {
  21. thread_num_ = num;
  22. }
  23. EventLoopPtr nextLoop() {
  24. if (loop_threads_.empty()) return NULL;
  25. return loop_threads_[++next_loop_idx_ % loop_threads_.size()]->loop();
  26. }
  27. EventLoopPtr loop(int idx = -1) {
  28. if (idx >= 0 && idx < loop_threads_.size()) {
  29. return loop_threads_[idx]->loop();
  30. }
  31. return nextLoop();
  32. }
  33. hloop_t* hloop(int idx = -1) {
  34. EventLoopPtr ptr = loop(idx);
  35. return ptr ? ptr->loop() : NULL;
  36. }
  37. // @param wait_threads_started: if ture this method will block until all loop_threads started.
  38. // @param pre: This functor will be executed when loop_thread started.
  39. // @param post:This Functor will be executed when loop_thread stopped.
  40. void start(bool wait_threads_started = false,
  41. std::function<void(const EventLoopPtr&)> pre = NULL,
  42. std::function<void(const EventLoopPtr&)> post = NULL) {
  43. if (status() >= kStarting && status() < kStopped) return;
  44. setStatus(kStarting);
  45. if (thread_num_ == 0) {
  46. setStatus(kRunning);
  47. return;
  48. }
  49. std::shared_ptr<std::atomic<int>> started_cnt(new std::atomic<int>(0));
  50. std::shared_ptr<std::atomic<int>> exited_cnt(new std::atomic<int>(0));
  51. loop_threads_.clear();
  52. for (int i = 0; i < thread_num_; ++i) {
  53. EventLoopThreadPtr loop_thread(new EventLoopThread);
  54. const EventLoopPtr& loop = loop_thread->loop();
  55. loop_thread->start(false,
  56. [this, started_cnt, pre, &loop]() {
  57. if (++(*started_cnt) == thread_num_) {
  58. setStatus(kRunning);
  59. }
  60. if (pre) pre(loop);
  61. return 0;
  62. },
  63. [this, exited_cnt, post, &loop]() {
  64. if (post) post(loop);
  65. if (++(*exited_cnt) == thread_num_) {
  66. setStatus(kStopped);
  67. }
  68. return 0;
  69. }
  70. );
  71. loop_threads_.push_back(loop_thread);
  72. }
  73. if (wait_threads_started) {
  74. while (status() < kRunning) {
  75. hv_delay(1);
  76. }
  77. }
  78. }
  79. // @param wait_threads_started: if ture this method will block until all loop_threads stopped.
  80. void stop(bool wait_threads_stopped = false) {
  81. if (status() < kStarting || status() >= kStopping) return;
  82. setStatus(kStopping);
  83. for (auto& loop_thread : loop_threads_) {
  84. loop_thread->stop(false);
  85. }
  86. if (wait_threads_stopped) {
  87. while (!isStopped()) {
  88. hv_delay(1);
  89. }
  90. }
  91. }
  92. // @brief join all loop_threads
  93. // @note destructor will join loop_threads if you forget to call this method.
  94. void join() {
  95. for (auto& loop_thread : loop_threads_) {
  96. loop_thread->join();
  97. }
  98. }
  99. private:
  100. int thread_num_;
  101. std::vector<EventLoopThreadPtr> loop_threads_;
  102. std::atomic<unsigned int> next_loop_idx_;
  103. };
  104. }
  105. #endif // HV_EVENT_LOOP_THREAD_POOL_HPP_