|
@@ -1,23 +1,15 @@
|
|
|
#ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_
|
|
#ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_
|
|
|
#define HV_EVENT_LOOP_THREAD_POOL_HPP_
|
|
#define HV_EVENT_LOOP_THREAD_POOL_HPP_
|
|
|
|
|
|
|
|
-#include <thread>
|
|
|
|
|
-
|
|
|
|
|
#include "EventLoopThread.h"
|
|
#include "EventLoopThread.h"
|
|
|
|
|
|
|
|
namespace hv {
|
|
namespace hv {
|
|
|
|
|
|
|
|
class EventLoopThreadPool : public Status {
|
|
class EventLoopThreadPool : public Status {
|
|
|
public:
|
|
public:
|
|
|
- EventLoopThreadPool(EventLoopPtr master_loop = NULL,
|
|
|
|
|
- int worker_threads = std::thread::hardware_concurrency()) {
|
|
|
|
|
|
|
+ EventLoopThreadPool(int thread_num = std::thread::hardware_concurrency()) {
|
|
|
setStatus(kInitializing);
|
|
setStatus(kInitializing);
|
|
|
- if (master_loop) {
|
|
|
|
|
- master_loop_ = master_loop;
|
|
|
|
|
- } else {
|
|
|
|
|
- master_loop_.reset(new EventLoop);
|
|
|
|
|
- }
|
|
|
|
|
- thread_num_ = worker_threads;
|
|
|
|
|
|
|
+ thread_num_ = thread_num;
|
|
|
next_loop_idx_ = 0;
|
|
next_loop_idx_ = 0;
|
|
|
setStatus(kInitialized);
|
|
setStatus(kInitialized);
|
|
|
}
|
|
}
|
|
@@ -27,28 +19,37 @@ public:
|
|
|
join();
|
|
join();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- int thread_num() {
|
|
|
|
|
|
|
+ int threadNum() {
|
|
|
return thread_num_;
|
|
return thread_num_;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- EventLoopPtr loop() {
|
|
|
|
|
- return master_loop_;
|
|
|
|
|
|
|
+ void setThreadNum(int num) {
|
|
|
|
|
+ thread_num_ = num;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- hloop_t* hloop() {
|
|
|
|
|
- return master_loop_->loop();
|
|
|
|
|
|
|
+ EventLoopPtr nextLoop() {
|
|
|
|
|
+ if (loop_threads_.empty()) return NULL;
|
|
|
|
|
+ return loop_threads_[++next_loop_idx_ % loop_threads_.size()]->loop();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- EventLoopPtr nextLoop() {
|
|
|
|
|
- if (isRunning() && !worker_threads_.empty()) {
|
|
|
|
|
- return worker_threads_[++next_loop_idx_ % worker_threads_.size()]->loop();
|
|
|
|
|
- } else {
|
|
|
|
|
- return master_loop_;
|
|
|
|
|
|
|
+ EventLoopPtr loop(int idx = -1) {
|
|
|
|
|
+ if (idx >= 0 && idx < loop_threads_.size()) {
|
|
|
|
|
+ return loop_threads_[idx]->loop();
|
|
|
}
|
|
}
|
|
|
|
|
+ return nextLoop();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // @param wait_threads_started: if ture this method will block until all worker_threads started.
|
|
|
|
|
- void start(bool wait_threads_started = false) {
|
|
|
|
|
|
|
+ hloop_t* hloop(int idx = -1) {
|
|
|
|
|
+ EventLoopPtr ptr = loop(idx);
|
|
|
|
|
+ return ptr ? ptr->loop() : NULL;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // @param wait_threads_started: if ture this method will block until all loop_threads started.
|
|
|
|
|
+ // @param pre: This functor will be executed when loop_thread started.
|
|
|
|
|
+ // @param post:This Functor will be executed when loop_thread stopped.
|
|
|
|
|
+ void start(bool wait_threads_started = false,
|
|
|
|
|
+ std::function<void(const EventLoopPtr&)> pre = NULL,
|
|
|
|
|
+ std::function<void(const EventLoopPtr&)> post = NULL) {
|
|
|
setStatus(kStarting);
|
|
setStatus(kStarting);
|
|
|
|
|
|
|
|
if (thread_num_ == 0) {
|
|
if (thread_num_ == 0) {
|
|
@@ -60,21 +61,25 @@ public:
|
|
|
std::shared_ptr<std::atomic<int>> exited_cnt(new std::atomic<int>(0));
|
|
std::shared_ptr<std::atomic<int>> exited_cnt(new std::atomic<int>(0));
|
|
|
|
|
|
|
|
for (int i = 0; i < thread_num_; ++i) {
|
|
for (int i = 0; i < thread_num_; ++i) {
|
|
|
- auto prefn = [this, started_cnt]() {
|
|
|
|
|
- if (++(*started_cnt) == thread_num_) {
|
|
|
|
|
- setStatus(kRunning);
|
|
|
|
|
- }
|
|
|
|
|
- return 0;
|
|
|
|
|
- };
|
|
|
|
|
- auto postfn = [this, exited_cnt]() {
|
|
|
|
|
- if (++(*exited_cnt) == thread_num_) {
|
|
|
|
|
- setStatus(kStopped);
|
|
|
|
|
|
|
+ EventLoopThreadPtr loop_thread(new EventLoopThread);
|
|
|
|
|
+ EventLoopPtr loop = loop_thread->loop();
|
|
|
|
|
+ loop_thread->start(false,
|
|
|
|
|
+ [this, started_cnt, pre, &loop]() {
|
|
|
|
|
+ if (++(*started_cnt) == thread_num_) {
|
|
|
|
|
+ setStatus(kRunning);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (pre) pre(loop);
|
|
|
|
|
+ return 0;
|
|
|
|
|
+ },
|
|
|
|
|
+ [this, exited_cnt, post, &loop]() {
|
|
|
|
|
+ if (post) post(loop);
|
|
|
|
|
+ if (++(*exited_cnt) == thread_num_) {
|
|
|
|
|
+ setStatus(kStopped);
|
|
|
|
|
+ }
|
|
|
|
|
+ return 0;
|
|
|
}
|
|
}
|
|
|
- return 0;
|
|
|
|
|
- };
|
|
|
|
|
- EventLoopThreadPtr worker(new EventLoopThread());
|
|
|
|
|
- worker->start(false, prefn, postfn);
|
|
|
|
|
- worker_threads_.push_back(worker);
|
|
|
|
|
|
|
+ );
|
|
|
|
|
+ loop_threads_.push_back(loop_thread);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (wait_threads_started) {
|
|
if (wait_threads_started) {
|
|
@@ -84,12 +89,12 @@ public:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // @param wait_threads_started: if ture this method will block until all worker_threads stopped.
|
|
|
|
|
|
|
+ // @param wait_threads_started: if ture this method will block until all loop_threads stopped.
|
|
|
void stop(bool wait_threads_stopped = false) {
|
|
void stop(bool wait_threads_stopped = false) {
|
|
|
setStatus(kStopping);
|
|
setStatus(kStopping);
|
|
|
|
|
|
|
|
- for (auto& worker : worker_threads_) {
|
|
|
|
|
- worker->stop(false);
|
|
|
|
|
|
|
+ for (auto& loop_thread : loop_threads_) {
|
|
|
|
|
+ loop_thread->stop(false);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (wait_threads_stopped) {
|
|
if (wait_threads_stopped) {
|
|
@@ -99,18 +104,17 @@ public:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // @brief join all worker_threads
|
|
|
|
|
- // @note destructor will join worker_threads if you forget to call this method.
|
|
|
|
|
|
|
+ // @brief join all loop_threads
|
|
|
|
|
+ // @note destructor will join loop_threads if you forget to call this method.
|
|
|
void join() {
|
|
void join() {
|
|
|
- for (auto& worker : worker_threads_) {
|
|
|
|
|
- worker->join();
|
|
|
|
|
|
|
+ for (auto& loop_thread : loop_threads_) {
|
|
|
|
|
+ loop_thread->join();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
private:
|
|
|
- EventLoopPtr master_loop_;
|
|
|
|
|
int thread_num_;
|
|
int thread_num_;
|
|
|
- std::vector<EventLoopThreadPtr> worker_threads_;
|
|
|
|
|
|
|
+ std::vector<EventLoopThreadPtr> loop_threads_;
|
|
|
std::atomic<unsigned int> next_loop_idx_;
|
|
std::atomic<unsigned int> next_loop_idx_;
|
|
|
};
|
|
};
|
|
|
|
|
|