1
0

EventLoopThreadPool.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_
  2. #define HV_EVENT_LOOP_THREAD_POOL_HPP_
  3. #include "EventLoopThread.h"
  4. #include "hbase.h"
  5. namespace hv {
  6. class EventLoopThreadPool : public Status {
  7. public:
  8. EventLoopThreadPool(int thread_num = std::thread::hardware_concurrency()) {
  9. setStatus(kInitializing);
  10. thread_num_ = thread_num;
  11. next_loop_idx_ = 0;
  12. setStatus(kInitialized);
  13. }
  14. ~EventLoopThreadPool() {
  15. stop();
  16. join();
  17. }
  18. int threadNum() {
  19. return thread_num_;
  20. }
  21. void setThreadNum(int num) {
  22. thread_num_ = num;
  23. }
  24. EventLoopPtr nextLoop(load_balance_e lb = LB_RoundRobin) {
  25. int numLoops = loop_threads_.size();
  26. if (numLoops == 0) return NULL;
  27. int idx = 0;
  28. if (lb == LB_RoundRobin) {
  29. if (++next_loop_idx_ >= numLoops) next_loop_idx_ = 0;
  30. idx = next_loop_idx_;
  31. } else if (lb == LB_Random) {
  32. idx = hv_rand(0, numLoops - 1);
  33. } else if (lb == LB_LeastConnections) {
  34. for (int i = 1; i < numLoops; ++i) {
  35. if (loop_threads_[i]->loop()->connectionNum < loop_threads_[idx]->loop()->connectionNum) {
  36. idx = i;
  37. }
  38. }
  39. } else {
  40. // Not Implemented
  41. }
  42. return loop_threads_[idx]->loop();
  43. }
  44. EventLoopPtr loop(int idx = -1) {
  45. if (idx >= 0 && idx < loop_threads_.size()) {
  46. return loop_threads_[idx]->loop();
  47. }
  48. return nextLoop();
  49. }
  50. hloop_t* hloop(int idx = -1) {
  51. EventLoopPtr ptr = loop(idx);
  52. return ptr ? ptr->loop() : NULL;
  53. }
  54. // @param wait_threads_started: if ture this method will block until all loop_threads started.
  55. // @param pre: This functor will be executed when loop_thread started.
  56. // @param post:This Functor will be executed when loop_thread stopped.
  57. void start(bool wait_threads_started = false,
  58. std::function<void(const EventLoopPtr&)> pre = NULL,
  59. std::function<void(const EventLoopPtr&)> post = NULL) {
  60. if (thread_num_ == 0) return;
  61. if (status() >= kStarting && status() < kStopped) return;
  62. setStatus(kStarting);
  63. std::shared_ptr<std::atomic<int>> started_cnt(new std::atomic<int>(0));
  64. std::shared_ptr<std::atomic<int>> exited_cnt(new std::atomic<int>(0));
  65. loop_threads_.clear();
  66. for (int i = 0; i < thread_num_; ++i) {
  67. EventLoopThreadPtr loop_thread(new EventLoopThread);
  68. const EventLoopPtr& loop = loop_thread->loop();
  69. loop_thread->start(false,
  70. [this, started_cnt, pre, &loop]() {
  71. if (++(*started_cnt) == thread_num_) {
  72. setStatus(kRunning);
  73. }
  74. if (pre) pre(loop);
  75. return 0;
  76. },
  77. [this, exited_cnt, post, &loop]() {
  78. if (post) post(loop);
  79. if (++(*exited_cnt) == thread_num_) {
  80. setStatus(kStopped);
  81. }
  82. return 0;
  83. }
  84. );
  85. loop_threads_.push_back(loop_thread);
  86. }
  87. if (wait_threads_started) {
  88. while (status() < kRunning) {
  89. hv_delay(1);
  90. }
  91. }
  92. }
  93. // @param wait_threads_started: if ture this method will block until all loop_threads stopped.
  94. // stop thread-safe
  95. void stop(bool wait_threads_stopped = false) {
  96. if (status() < kStarting || status() >= kStopping) return;
  97. setStatus(kStopping);
  98. for (auto& loop_thread : loop_threads_) {
  99. loop_thread->stop(false);
  100. }
  101. if (wait_threads_stopped) {
  102. while (!isStopped()) {
  103. hv_delay(1);
  104. }
  105. }
  106. }
  107. // @brief join all loop_threads
  108. // @note destructor will join loop_threads if you forget to call this method.
  109. void join() {
  110. for (auto& loop_thread : loop_threads_) {
  111. loop_thread->join();
  112. }
  113. }
  114. private:
  115. int thread_num_;
  116. std::vector<EventLoopThreadPtr> loop_threads_;
  117. std::atomic<unsigned int> next_loop_idx_;
  118. };
  119. }
  120. #endif // HV_EVENT_LOOP_THREAD_POOL_HPP_