Browse Source

Add evpp: EventLoop EventLoopThread EventLoopThreadPool

ithewei 5 years ago
parent
commit
4f9d8e9627

+ 6 - 0
event/hevent.c

@@ -1,5 +1,11 @@
 #include "hevent.h"
 #include "hsocket.h"
+#include "hatomic.h"
+
+uint64_t hloop_next_event_id() {
+    static hatomic_t s_id = HATOMIC_VAR_INIT(0);
+    return ++s_id;
+}
 
 int hio_fd(hio_t* io) {
     return io->fd;

+ 3 - 3
event/hevent.h

@@ -3,7 +3,6 @@
 
 #include "hloop.h"
 #include "hbuf.h"
-#include "hatomic.h"
 #include "hmutex.h"
 
 #include "array.h"
@@ -29,7 +28,6 @@ struct hloop_s {
     void*       userdata;
 //private:
     // events
-    hatomic_t                   event_counter;
     uint32_t                    nactives;
     uint32_t                    npendings;
     // pendings: with priority as array.index
@@ -52,6 +50,8 @@ struct hloop_s {
     hmutex_t                    custom_events_mutex;
 };
 
+uint64_t hloop_next_event_id();
+
 struct hidle_s {
     HEVENT_FIELDS
     uint32_t    repeat;
@@ -171,7 +171,7 @@ void hio_free(hio_t* io);
 #define EVENT_ADD(loop, ev, cb) \
     do {\
         ev->loop = loop;\
-        ev->event_id = ++loop->event_counter;\
+        ev->event_id = hloop_next_event_id();\
         ev->cb = (hevent_cb)cb;\
         EVENT_ACTIVE(ev);\
     } while(0)

+ 1 - 1
event/hloop.c

@@ -213,7 +213,7 @@ void hloop_post_event(hloop_t* loop, hevent_t* ev) {
         ev->event_type = HEVENT_TYPE_CUSTOM;
     }
     if (ev->event_id == 0) {
-        ev->event_id = ++loop->event_counter;
+        ev->event_id = hloop_next_event_id();
     }
 
     hmutex_lock(&loop->custom_events_mutex);

+ 47 - 15
evpp/EventLoop.h

@@ -9,50 +9,74 @@
 #include "hloop.h"
 #include "hthread.h"
 
+#include "Status.h"
 #include "Event.h"
 
 namespace hv {
 
-class EventLoop {
+class EventLoop : public Status {
 public:
+
     typedef std::function<void()> Functor;
 
-    EventLoop() {
-        loop_ = hloop_new(HLOOP_FLAG_AUTO_FREE);
-        assert(loop_ != NULL);
-        hloop_set_userdata(loop_, this);
+    // New an EventLoop using an existing hloop_t object,
+    // so we can embed an EventLoop object into the old application based on hloop.
+    // NOTE: Be careful to deal with destroy of hloop_t.
+    EventLoop(hloop_t* loop = NULL) {
+        setStatus(kInitializing);
+        if (loop) {
+            loop_ = loop;
+        } else {
+            loop_ = hloop_new(HLOOP_FLAG_AUTO_FREE);
+        }
+        setStatus(kInitialized);
     }
 
     ~EventLoop() {
         stop();
     }
 
-    void start() {
+    hloop_t* loop() {
+        return loop_;
+    }
+
+    // @brief Run loop forever
+    void run() {
         if (loop_ == NULL) return;
+        setStatus(kRunning);
         hloop_run(loop_);
+        setStatus(kStopped);
     }
 
     void stop() {
         if (loop_ == NULL) return;
+        setStatus(kStopping);
         hloop_stop(loop_);
         loop_ = NULL;
     }
 
     void pause() {
         if (loop_ == NULL) return;
-        hloop_pause(loop_);
+        if (isRunning()) {
+            hloop_pause(loop_);
+            setStatus(kPause);
+        }
     }
 
     void resume() {
         if (loop_ == NULL) return;
-        hloop_resume(loop_);
+        if (isPause()) {
+            hloop_resume(loop_);
+            setStatus(kRunning);
+        }
     }
 
+    // Timer interfaces: setTimer, killTimer, resetTimer
     TimerID setTimer(int timeout_ms, TimerCallback cb, int repeat = INFINITE) {
         htimer_t* htimer = htimer_add(loop_, onTimer, timeout_ms, repeat);
 
         Timer timer(htimer, cb, repeat);
-        hevent_set_userdata(htimer, &timer);
+        hevent_set_userdata(htimer, this);
 
         TimerID timerID = hevent_id(htimer);
 
@@ -92,16 +116,21 @@ public:
         }
     }
 
-    bool isInLoop() {
+    long tid() {
+        if (loop_ == NULL) hv_gettid();
+        return hloop_tid(loop_);
+    }
+
+    bool isInLoopThread() {
         return hv_gettid() == hloop_tid(loop_);
     }
 
-    void assertInLoop() {
-        assert(isInLoop());
+    void assertInLoopThread() {
+        assert(isInLoopThread());
     }
 
     void runInLoop(Functor fn) {
-        if (isInLoop()) {
+        if (isInLoopThread()) {
             if (fn) fn();
         } else {
             queueInLoop(fn);
@@ -118,6 +147,7 @@ public:
         if (loop_ == NULL) return;
 
         EventPtr ev(new Event(cb));
+        hevent_set_userdata(&ev->event, this);
         ev->event.cb = onCustomEvent;
 
         mutex_.lock();
@@ -129,8 +159,8 @@ public:
 
 private:
     static void onTimer(htimer_t* htimer) {
+        EventLoop* loop = (EventLoop*)hevent_userdata(htimer);
         hloop_t* hloop = (hloop_t*)hevent_loop(htimer);
-        EventLoop* loop = (EventLoop*)hloop_userdata(hloop);
 
         TimerID timerID = hevent_id(htimer);
         TimerCallback cb = NULL;
@@ -161,8 +191,8 @@ private:
     }
 
     static void onCustomEvent(hevent_t* hev) {
+        EventLoop* loop = (EventLoop*)hevent_userdata(hev);
         hloop_t* hloop = (hloop_t*)hevent_loop(hev);
-        EventLoop* loop = (EventLoop*)hloop_userdata(hloop);
 
         loop->mutex_.lock();
         EventPtr ev = loop->customEvents.front();
@@ -179,6 +209,8 @@ private:
     std::map<TimerID, Timer>    timers;         // GUAREDE_BY(mutex_)
 };
 
+typedef std::shared_ptr<EventLoop> EventLoopPtr;
+
 }
 
 #endif // HV_EVENT_LOOP_HPP_

+ 114 - 0
evpp/EventLoopThread.h

@@ -0,0 +1,114 @@
+#ifndef HV_EVENT_LOOP_THREAD_HPP_
+#define HV_EVENT_LOOP_THREAD_HPP_
+
+#include <thread>
+
+#include "EventLoop.h"
+
+namespace hv {
+
+class EventLoopThread : public Status {
+public:
+    // Return 0 means OK, other failed.
+    typedef std::function<int()> Functor;
+
+    EventLoopThread(EventLoopPtr loop = NULL) {
+        setStatus(kInitializing);
+        if (loop) {
+            loop_ = loop;
+        } else {
+            loop_.reset(new EventLoop);
+        }
+        setStatus(kInitialized);
+    }
+
+    ~EventLoopThread() {
+        stop();
+        join();
+    }
+
+    EventLoopPtr loop() {
+        return loop_;
+    }
+
+    hloop_t* hloop() {
+        return loop_->loop();
+    }
+
+    bool isRunning() {
+        return loop_->isRunning();
+    }
+
+    // @param wait_thread_started: if ture this method will block until loop_thread started.
+    // @param pre: This functor will be executed when loop_thread started.
+    // @param post:This Functor will be executed when loop_thread stopped.
+    void start(bool wait_thread_started = true,
+               Functor pre = Functor(),
+               Functor post = Functor()) {
+        setStatus(kStarting);
+
+        assert(thread_.get() == NULL);
+        thread_.reset(new std::thread(&EventLoopThread::loop_thread, this, pre, post));
+
+        if (wait_thread_started) {
+            while (loop_->status() < kRunning) {
+                hv_delay(1);
+            }
+        }
+    }
+
+    // @param wait_thread_started: if ture this method will block until loop_thread stopped.
+    void stop(bool wait_thread_stopped = false) {
+        if (status() >= kStopping) return;
+        setStatus(kStopping);
+
+        loop_->stop();
+
+        if (wait_thread_stopped) {
+            while (!isStopped()) {
+                hv_delay(1);
+            }
+        }
+    }
+
+    // @brief join loop_thread
+    // @note  destructor will join loop_thread if you forget to call this method.
+    void join() {
+        if (thread_ && thread_->joinable()) {
+            thread_->join();
+            thread_ = NULL;
+        }
+    }
+
+private:
+    void loop_thread(const Functor& pre, const Functor& post) {
+        setStatus(kStarted);
+
+        if (pre) {
+            loop_->queueInLoop([this, pre]{
+                if (pre() != 0) {
+                    loop_->stop();
+                }
+            });
+        }
+
+        loop_->run();
+        assert(loop_->isStopped());
+
+        if (post) {
+            post();
+        }
+
+        setStatus(kStopped);
+    }
+
+private:
+    EventLoopPtr                 loop_;
+    std::shared_ptr<std::thread> thread_;
+};
+
+typedef std::shared_ptr<EventLoopThread> EventLoopThreadPtr;
+
+}
+
+#endif // HV_EVENT_LOOP_THREAD_HPP_

+ 119 - 0
evpp/EventLoopThreadPool.h

@@ -0,0 +1,119 @@
+#ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_
+#define HV_EVENT_LOOP_THREAD_POOL_HPP_
+
+#include <thread>
+
+#include "EventLoopThread.h"
+
+namespace hv {
+
+class EventLoopThreadPool : public Status {
+public:
+    EventLoopThreadPool(EventLoopPtr master_loop = NULL,
+                        int worker_threads = std::thread::hardware_concurrency()) {
+        setStatus(kInitializing);
+        if (master_loop) {
+            master_loop_ = master_loop;
+        } else {
+            master_loop_.reset(new EventLoop);
+        }
+        thread_num_ = worker_threads;
+        next_loop_idx_ = 0;
+        setStatus(kInitialized);
+    }
+
+    ~EventLoopThreadPool() {
+        stop();
+        join();
+    }
+
+    int thread_num() {
+        return thread_num_;
+    }
+
+    EventLoopPtr loop() {
+        return master_loop_;
+    }
+
+    hloop_t* hloop() {
+        return master_loop_->loop();
+    }
+
+    EventLoopPtr nextLoop() {
+        if (isRunning() && !worker_threads_.empty()) {
+            return worker_threads_[++next_loop_idx_ % worker_threads_.size()]->loop();
+        } else {
+            return master_loop_;
+        }
+    }
+
+    // @param wait_threads_started: if ture this method will block until all worker_threads started.
+    void start(bool wait_threads_started = false) {
+        setStatus(kStarting);
+
+        if (thread_num_ == 0) {
+            setStatus(kRunning);
+            return;
+        }
+
+        std::shared_ptr<std::atomic<int>> started_cnt(new std::atomic<int>(0));
+        std::shared_ptr<std::atomic<int>> exited_cnt(new std::atomic<int>(0));
+
+        for (int i = 0; i < thread_num_; ++i) {
+            auto prefn = [this, started_cnt]() {
+                if (++(*started_cnt) == thread_num_) {
+                    setStatus(kRunning);
+                }
+                return 0;
+            };
+            auto postfn = [this, exited_cnt]() {
+                if (++(*exited_cnt) == thread_num_) {
+                    setStatus(kStopped);
+                }
+                return 0;
+            };
+            EventLoopThreadPtr worker(new EventLoopThread());
+            worker->start(false, prefn, postfn);
+            worker_threads_.push_back(worker);
+        }
+
+        if (wait_threads_started) {
+            while (status() < kRunning) {
+                hv_delay(1);
+            }
+        }
+    }
+
+    // @param wait_threads_started: if ture this method will block until all worker_threads stopped.
+    void stop(bool wait_threads_stopped = false) {
+        setStatus(kStopping);
+
+        for (auto& worker : worker_threads_) {
+            worker->stop(false);
+        }
+
+        if (wait_threads_stopped) {
+            while (!isStopped()) {
+                hv_delay(1);
+            }
+        }
+    }
+
+    // @brief join all worker_threads
+    // @note  destructor will join worker_threads if you forget to call this method.
+    void join() {
+        for (auto& worker : worker_threads_) {
+            worker->join();
+        }
+    }
+
+private:
+    EventLoopPtr                                master_loop_;
+    int                                         thread_num_;
+    std::vector<EventLoopThreadPtr>             worker_threads_;
+    std::atomic<unsigned int>                   next_loop_idx_;
+};
+
+}
+
+#endif // HV_EVENT_LOOP_THREAD_POOL_HPP_

+ 63 - 0
evpp/EventLoopThreadPool_test.cpp

@@ -0,0 +1,63 @@
+/*
+ * EventLoopThreadPool_test.cpp
+ *
+ * @build
+ * make libhv && sudo make install
+ * g++ -std=c++11 EventLoopThreadPool_test.cpp -o EventLoopThreadPool_test -I/usr/local/include/hv -lhv -lpthread
+ *
+ */
+
+#include "hv.h"
+
+#include "EventLoopThreadPool.h"
+
+using namespace hv;
+
+static void onTimer(TimerID timerID, int n) {
+    printf("tid=%ld timerID=%lu time=%lus n=%d\n", hv_gettid(), (unsigned long)timerID, (unsigned long)time(NULL), n);
+}
+
+int main(int argc, char* argv[]) {
+    HV_MEMCHECK;
+
+    printf("main tid=%ld\n", hv_gettid());
+
+    EventLoopPtr master_loop(new EventLoop);
+    EventLoopThreadPool loop_threads(master_loop, 4);
+
+    loop_threads.start(true);
+
+    int thread_num = loop_threads.thread_num();
+    for (int i = 0; i < thread_num; ++i) {
+        EventLoopPtr loop = loop_threads.nextLoop();
+        printf("worker[%d] tid=%ld\n", i, loop->tid());
+
+        loop->runInLoop([loop](){
+            // runEvery 1s
+            loop->setInterval(1000, std::bind(onTimer, std::placeholders::_1, 100));
+        });
+
+        loop->queueInLoop([](){
+            printf("queueInLoop tid=%ld\n", hv_gettid());
+        });
+
+        loop->runInLoop([](){
+            printf("runInLoop tid=%ld\n", hv_gettid());
+        });
+    }
+
+    // runAfter 10s
+    master_loop->setTimeout(10000, [&loop_threads](TimerID timerID){
+        EventLoopPtr master_loop = loop_threads.loop();
+        master_loop->stop();
+        loop_threads.stop(false);
+    });
+
+    // master_loop run in main thread
+    master_loop->run();
+
+    // wait loop_threads exit
+    loop_threads.join();
+
+    return 0;
+}

+ 50 - 0
evpp/EventLoopThread_test.cpp

@@ -0,0 +1,50 @@
+/*
+ * EventLoopThread_test.cpp
+ *
+ * @build
+ * make libhv && sudo make install
+ * g++ -std=c++11 EventLoopThread_test.cpp -o EventLoopThread_test -I/usr/local/include/hv -lhv -lpthread
+ *
+ */
+
+#include "hv.h"
+
+#include "EventLoopThread.h"
+
+using namespace hv;
+
+static void onTimer(TimerID timerID, int n) {
+    printf("tid=%ld timerID=%lu time=%lus n=%d\n", hv_gettid(), (unsigned long)timerID, (unsigned long)time(NULL), n);
+}
+
+int main(int argc, char* argv[]) {
+    HV_MEMCHECK;
+
+    printf("main tid=%ld\n", hv_gettid());
+
+    EventLoopThread loop_thread;
+    EventLoopPtr loop = loop_thread.loop();
+
+    // runEvery 1s
+    loop->setInterval(1000, std::bind(onTimer, std::placeholders::_1, 100));
+
+    // runAfter 10s
+    loop->setTimeout(10000, [&loop](TimerID timerID){
+        loop->stop();
+    });
+
+    loop_thread.start();
+
+    loop->queueInLoop([](){
+        printf("queueInLoop tid=%ld\n", hv_gettid());
+    });
+
+    loop->runInLoop([](){
+        printf("runInLoop tid=%ld\n", hv_gettid());
+    });
+
+    // wait loop_thread exit
+    loop_thread.join();
+
+    return 0;
+}

+ 15 - 24
evpp/EventLoop_test.cpp

@@ -7,50 +7,41 @@
  *
  */
 
-#include "hbase.h" // import HV_MEMCHECK
+#include "hv.h"
 
 #include "EventLoop.h"
 
 using namespace hv;
 
-static void onTimer(TimerID timerID, EventLoop* loop, int n) {
-    static int cnt = 0;
-    printf("n=%d timerID=%lu time=%lus\n", n, (unsigned long)timerID, (unsigned long)time(NULL));
-    if (++cnt == 15) {
-        printf("killTimer(%ld)\n", (unsigned long)timerID);
-        loop->killTimer(timerID);
-    }
+static void onTimer(TimerID timerID, int n) {
+    printf("tid=%ld timerID=%lu time=%lus n=%d\n", hv_gettid(), (unsigned long)timerID, (unsigned long)time(NULL), n);
 }
 
 int main(int argc, char* argv[]) {
     HV_MEMCHECK;
 
-    EventLoop loop;
+    printf("main tid=%ld\n", hv_gettid());
 
-    int n = 100;
-    loop.setInterval(1000, std::bind(onTimer, std::placeholders::_1, &loop, n));
+    EventLoopPtr loop(new EventLoop);
 
-    loop.setTimeout(10000, [&loop](TimerID timerID){
-        static int cnt = 0;
-        if (cnt++ == 0) {
-            printf("resetTimer(%ld)\n", (unsigned long)timerID);
-            loop.resetTimer(timerID);
-        } else {
-            loop.stop();
-        }
-    });
+    // runEvery 1s
+    loop->setInterval(1000, std::bind(onTimer, std::placeholders::_1, 100));
 
-    printf("tid=%ld\n", hv_gettid());
+    // runAfter 10s
+    loop->setTimeout(10000, [&loop](TimerID timerID){
+        loop->stop();
+    });
 
-    loop.queueInLoop([](){
+    loop->queueInLoop([](){
         printf("queueInLoop tid=%ld\n", hv_gettid());
     });
 
-    loop.runInLoop([](){
+    loop->runInLoop([](){
         printf("runInLoop tid=%ld\n", hv_gettid());
     });
 
-    loop.start();
+    // run until loop stopped
+    loop->run();
 
     return 0;
 }

+ 56 - 0
evpp/Status.h

@@ -0,0 +1,56 @@
+#ifndef HV_STATUS_HPP_
+#define HV_STATUS_HPP_
+
+#include <atomic>
+
+namespace hv {
+
+class Status {
+public:
+    enum KStatus {
+        kNull           = 0,
+        kInitializing   = 1,
+        kInitialized    = 2,
+        kStarting       = 3,
+        kStarted        = 4,
+        kRunning        = 5,
+        kPause          = 6,
+        kStopping       = 7,
+        kStopped        = 8,
+        kDestroyed      = 9,
+    };
+
+    Status() {
+        status_ = kNull;
+    }
+    ~Status() {
+        status_ = kDestroyed;
+    }
+
+    KStatus status() {
+        return status_;
+    }
+
+    void setStatus(KStatus status) {
+        status_ = status;
+    }
+
+    bool isRunning() {
+        return status_ == kRunning;
+    }
+
+    bool isPause() {
+        return status_ == kPause;
+    }
+
+    bool isStopped() {
+        return status_ == kStopped;
+    }
+
+private:
+    std::atomic<KStatus> status_;
+};
+
+}
+
+#endif // HV_STATUS_HPP_

+ 5 - 0
evpp/build_test.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+g++ -std=c++11 EventLoop_test.cpp -o EventLoop_test -I/usr/local/include/hv -lhv
+g++ -std=c++11 EventLoopThread_test.cpp -o EventLoopThread_test -I/usr/local/include/hv -lhv -lpthread
+g++ -std=c++11 EventLoopThreadPool_test.cpp -o EventLoopThreadPool_test -I/usr/local/include/hv -lhv -lpthread