EventLoopThread.h 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #ifndef HV_EVENT_LOOP_THREAD_HPP_
  2. #define HV_EVENT_LOOP_THREAD_HPP_
  3. #include <thread>
  4. #include "hlog.h"
  5. #include "EventLoop.h"
  6. namespace hv {
  7. class EventLoopThread : public Status {
  8. public:
  9. // Return 0 means OK, other failed.
  10. typedef std::function<int()> Functor;
  11. EventLoopThread(EventLoopPtr loop = NULL) {
  12. setStatus(kInitializing);
  13. if (loop) {
  14. loop_ = loop;
  15. } else {
  16. loop_.reset(new EventLoop);
  17. }
  18. setStatus(kInitialized);
  19. }
  20. ~EventLoopThread() {
  21. stop();
  22. join();
  23. }
  24. const EventLoopPtr& loop() {
  25. return loop_;
  26. }
  27. hloop_t* hloop() {
  28. return loop_->loop();
  29. }
  30. bool isRunning() {
  31. return loop_->isRunning();
  32. }
  33. // @param wait_thread_started: if ture this method will block until loop_thread started.
  34. // @param pre: This functor will be executed when loop_thread started.
  35. // @param post:This Functor will be executed when loop_thread stopped.
  36. void start(bool wait_thread_started = true,
  37. Functor pre = Functor(),
  38. Functor post = Functor()) {
  39. if (status() >= kStarting && status() < kStopped) return;
  40. setStatus(kStarting);
  41. thread_.reset(new std::thread(&EventLoopThread::loop_thread, this, pre, post));
  42. if (wait_thread_started) {
  43. while (loop_->status() < kRunning) {
  44. hv_delay(1);
  45. }
  46. }
  47. }
  48. // @param wait_thread_started: if ture this method will block until loop_thread stopped.
  49. // stop thread-safe
  50. void stop(bool wait_thread_stopped = false) {
  51. if (status() < kStarting || status() >= kStopping) return;
  52. setStatus(kStopping);
  53. long loop_tid = loop_->tid();
  54. loop_->stop();
  55. if (wait_thread_stopped) {
  56. if (hv_gettid() == loop_tid) return;
  57. while (!isStopped()) {
  58. hv_delay(1);
  59. }
  60. }
  61. }
  62. // @brief join loop_thread
  63. // @note destructor will join loop_thread if you forget to call this method.
  64. void join() {
  65. if (thread_ && thread_->joinable()) {
  66. thread_->join();
  67. thread_ = NULL;
  68. }
  69. }
  70. private:
  71. void loop_thread(const Functor& pre, const Functor& post) {
  72. hlogi("EventLoopThread started, tid=%ld", hv_gettid());
  73. setStatus(kStarted);
  74. if (pre) {
  75. loop_->queueInLoop([this, pre]{
  76. if (pre() != 0) {
  77. loop_->stop();
  78. }
  79. });
  80. }
  81. loop_->run();
  82. assert(loop_->isStopped());
  83. if (post) {
  84. post();
  85. }
  86. setStatus(kStopped);
  87. hlogi("EventLoopThread stopped, tid=%ld", hv_gettid());
  88. }
  89. private:
  90. EventLoopPtr loop_;
  91. std::shared_ptr<std::thread> thread_;
  92. };
  93. typedef std::shared_ptr<EventLoopThread> EventLoopThreadPtr;
  94. }
  95. #endif // HV_EVENT_LOOP_THREAD_HPP_