EventLoopThreadPool.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_
  2. #define HV_EVENT_LOOP_THREAD_POOL_HPP_
  3. #include <thread>
  4. #include "EventLoopThread.h"
  5. namespace hv {
  6. class EventLoopThreadPool : public Status {
  7. public:
  8. EventLoopThreadPool(EventLoopPtr master_loop = NULL,
  9. int worker_threads = std::thread::hardware_concurrency()) {
  10. setStatus(kInitializing);
  11. if (master_loop) {
  12. master_loop_ = master_loop;
  13. } else {
  14. master_loop_.reset(new EventLoop);
  15. }
  16. thread_num_ = worker_threads;
  17. next_loop_idx_ = 0;
  18. setStatus(kInitialized);
  19. }
  20. ~EventLoopThreadPool() {
  21. stop();
  22. join();
  23. }
  24. int thread_num() {
  25. return thread_num_;
  26. }
  27. EventLoopPtr loop() {
  28. return master_loop_;
  29. }
  30. hloop_t* hloop() {
  31. return master_loop_->loop();
  32. }
  33. EventLoopPtr nextLoop() {
  34. if (isRunning() && !worker_threads_.empty()) {
  35. return worker_threads_[++next_loop_idx_ % worker_threads_.size()]->loop();
  36. } else {
  37. return master_loop_;
  38. }
  39. }
  40. // @param wait_threads_started: if ture this method will block until all worker_threads started.
  41. void start(bool wait_threads_started = false) {
  42. setStatus(kStarting);
  43. if (thread_num_ == 0) {
  44. setStatus(kRunning);
  45. return;
  46. }
  47. std::shared_ptr<std::atomic<int>> started_cnt(new std::atomic<int>(0));
  48. std::shared_ptr<std::atomic<int>> exited_cnt(new std::atomic<int>(0));
  49. for (int i = 0; i < thread_num_; ++i) {
  50. auto prefn = [this, started_cnt]() {
  51. if (++(*started_cnt) == thread_num_) {
  52. setStatus(kRunning);
  53. }
  54. return 0;
  55. };
  56. auto postfn = [this, exited_cnt]() {
  57. if (++(*exited_cnt) == thread_num_) {
  58. setStatus(kStopped);
  59. }
  60. return 0;
  61. };
  62. EventLoopThreadPtr worker(new EventLoopThread());
  63. worker->start(false, prefn, postfn);
  64. worker_threads_.push_back(worker);
  65. }
  66. if (wait_threads_started) {
  67. while (status() < kRunning) {
  68. hv_delay(1);
  69. }
  70. }
  71. }
  72. // @param wait_threads_started: if ture this method will block until all worker_threads stopped.
  73. void stop(bool wait_threads_stopped = false) {
  74. setStatus(kStopping);
  75. for (auto& worker : worker_threads_) {
  76. worker->stop(false);
  77. }
  78. if (wait_threads_stopped) {
  79. while (!isStopped()) {
  80. hv_delay(1);
  81. }
  82. }
  83. }
  84. // @brief join all worker_threads
  85. // @note destructor will join worker_threads if you forget to call this method.
  86. void join() {
  87. for (auto& worker : worker_threads_) {
  88. worker->join();
  89. }
  90. }
  91. private:
  92. EventLoopPtr master_loop_;
  93. int thread_num_;
  94. std::vector<EventLoopThreadPtr> worker_threads_;
  95. std::atomic<unsigned int> next_loop_idx_;
  96. };
  97. }
  98. #endif // HV_EVENT_LOOP_THREAD_POOL_HPP_