Procházet zdrojové kódy

constructor inject EventLoopPtr

ithewei před 3 roky
rodič
revize
cae9dd3c21
7 změnil soubory, kde provedl 247 přidání a 56 odebrání
  1. 1 0
      Makefile
  2. 1 5
      evpp/EventLoopThread.h
  3. 38 14
      evpp/TcpClient.h
  4. 93 0
      evpp/TcpClientEventLoop_test.cpp
  5. 42 13
      evpp/TcpServer.h
  6. 36 12
      evpp/UdpClient.h
  7. 36 12
      evpp/UdpServer.h

+ 1 - 0
Makefile

@@ -249,6 +249,7 @@ evpp: prepare libhv
 	$(CXX) -g -Wall -O0 -std=c++11 -I. -Ibase -Issl -Ievent -Icpputil -Ievpp -o bin/TimerThread_test         evpp/TimerThread_test.cpp         -Llib -lhv -pthread
 	$(CXX) -g -Wall -O0 -std=c++11 -I. -Ibase -Issl -Ievent -Icpputil -Ievpp -o bin/TcpServer_test           evpp/TcpServer_test.cpp           -Llib -lhv -pthread
 	$(CXX) -g -Wall -O0 -std=c++11 -I. -Ibase -Issl -Ievent -Icpputil -Ievpp -o bin/TcpClient_test           evpp/TcpClient_test.cpp           -Llib -lhv -pthread
+	$(CXX) -g -Wall -O0 -std=c++11 -I. -Ibase -Issl -Ievent -Icpputil -Ievpp -o bin/TcpClientEventLoop_test  evpp/TcpClientEventLoop_test.cpp  -Llib -lhv -pthread
 	$(CXX) -g -Wall -O0 -std=c++11 -I. -Ibase -Issl -Ievent -Icpputil -Ievpp -o bin/UdpServer_test           evpp/UdpServer_test.cpp           -Llib -lhv -pthread
 	$(CXX) -g -Wall -O0 -std=c++11 -I. -Ibase -Issl -Ievent -Icpputil -Ievpp -o bin/UdpClient_test           evpp/UdpClient_test.cpp           -Llib -lhv -pthread
 

+ 1 - 5
evpp/EventLoopThread.h

@@ -16,11 +16,7 @@ public:
 
     EventLoopThread(EventLoopPtr loop = NULL) {
         setStatus(kInitializing);
-        if (loop) {
-            loop_ = loop;
-        } else {
-            loop_.reset(new EventLoop);
-        }
+        loop_ = loop ? loop : std::make_shared<EventLoop>();
         setStatus(kInitialized);
     }
 

+ 38 - 14
evpp/TcpClient.h

@@ -11,11 +11,12 @@
 namespace hv {
 
 template<class TSocketChannel = SocketChannel>
-class TcpClientTmpl {
+class TcpClientEventLoopTmpl {
 public:
     typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
 
-    TcpClientTmpl() {
+    TcpClientEventLoopTmpl(EventLoopPtr loop = NULL) {
+        loop_ = loop ? loop : std::make_shared<EventLoop>();
         connect_timeout = HIO_DEFAULT_CONNECT_TIMEOUT;
         tls = false;
         tls_setting = NULL;
@@ -23,14 +24,14 @@ public:
         unpack_setting = NULL;
     }
 
-    virtual ~TcpClientTmpl() {
+    virtual ~TcpClientEventLoopTmpl() {
         HV_FREE(tls_setting);
         HV_FREE(reconn_setting);
         HV_FREE(unpack_setting);
     }
 
     const EventLoopPtr& loop() {
-        return loop_thread.loop();
+        return loop_;
     }
 
     //NOTE: By default, not bind local port. If necessary, you can call system api bind() after createsocket().
@@ -53,7 +54,7 @@ public:
             return -2;
         }
 
-        hio_t* io = hio_get(loop_thread.hloop(), connfd);
+        hio_t* io = hio_get(loop_->loop(), connfd);
         assert(io != NULL);
         hio_set_peeraddr(io, remote_addr, SOCKADDR_LEN(remote_addr));
         channel.reset(new TSocketChannel(io));
@@ -123,7 +124,7 @@ public:
         if (!reconn_setting) return -1;
         if (!reconn_setting_can_retry(reconn_setting)) return -2;
         uint32_t delay = reconn_setting_calc_delay(reconn_setting);
-        loop_thread.loop()->setTimeout(delay, [this](TimerID timerID){
+        loop_->setTimeout(delay, [this](TimerID timerID){
             hlogi("reconnect... cnt=%d, delay=%d", reconn_setting->cur_retry_cnt, reconn_setting->cur_delay);
             if (createsocket(&remote_addr.sa) < 0) return;
             startConnect();
@@ -131,13 +132,9 @@ public:
         return 0;
     }
 
-    void start(bool wait_threads_started = true) {
-        loop_thread.start(wait_threads_started, std::bind(&TcpClientTmpl::startConnect, this));
-    }
-    // stop thread-safe
-    void stop(bool wait_threads_stopped = true) {
-        setReconnect(NULL);
-        loop_thread.stop(wait_threads_stopped);
+    // start thread-safe
+    void start() {
+        loop_->runInLoop(std::bind(&TcpClientEventLoopTmpl::startConnect, this));
     }
 
     bool isConnected() {
@@ -217,7 +214,34 @@ public:
     std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
 
 private:
-    EventLoopThread         loop_thread;
+    EventLoopPtr            loop_;
+};
+
+template<class TSocketChannel = SocketChannel>
+class TcpClientTmpl : private EventLoopThread, public TcpClientEventLoopTmpl<TSocketChannel> {
+public:
+    TcpClientTmpl(EventLoopPtr loop = NULL)
+        : EventLoopThread()
+        , TcpClientEventLoopTmpl<TSocketChannel>(EventLoopThread::loop())
+    {}
+    virtual ~TcpClientTmpl() {
+        stop(true);
+    }
+
+    const EventLoopPtr& loop() {
+        return EventLoopThread::loop();
+    }
+
+    // start thread-safe
+    void start(bool wait_threads_started = true) {
+        EventLoopThread::start(wait_threads_started, std::bind(&TcpClientTmpl::startConnect, this));
+    }
+
+    // stop thread-safe
+    void stop(bool wait_threads_stopped = true) {
+        TcpClientTmpl::setReconnect(NULL);
+        EventLoopThread::stop(wait_threads_stopped);
+    }
 };
 
 typedef TcpClientTmpl<SocketChannel> TcpClient;

+ 93 - 0
evpp/TcpClientEventLoop_test.cpp

@@ -0,0 +1,93 @@
+/*
+ * TcpClientEventLoop_test.cpp
+ *
+ * @build   make evpp
+ * @server  bin/TcpServer_test 1234
+ * @client  bin/TcpClientEventLoop_test 1234
+ *
+ */
+
+#include "TcpClient.h"
+#include "htime.h"
+
+#define TEST_RECONNECT  1
+#define TEST_TLS        0
+
+using namespace hv;
+
+class MyTcpClient : public TcpClientEventLoopTmpl<SocketChannel> {
+public:
+    MyTcpClient(EventLoopPtr loop = NULL) : TcpClientEventLoopTmpl<SocketChannel>(loop) {
+        onConnection = [this](const SocketChannelPtr& channel) {
+            std::string peeraddr = channel->peeraddr();
+            if (channel->isConnected()) {
+                printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
+                // send(time) every 3s
+                setInterval(3000, [channel](TimerID timerID){
+                    if (channel->isConnected()) {
+                        if (channel->isWriteComplete()) {
+                            char str[DATETIME_FMT_BUFLEN] = {0};
+                            datetime_t dt = datetime_now();
+                            datetime_fmt(&dt, str);
+                            channel->write(str);
+                        }
+                    } else {
+                        killTimer(timerID);
+                    }
+                });
+            } else {
+                printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
+            }
+            if (isReconnect()) {
+                printf("reconnect cnt=%d, delay=%d\n", reconn_setting->cur_retry_cnt, reconn_setting->cur_delay);
+            }
+        };
+
+        onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
+            printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
+        };
+    }
+
+    int connect(int port) {
+        int connfd = createsocket(port);
+        if (connfd < 0) {
+            return connfd;
+        }
+#if TEST_RECONNECT
+        // reconnect: 1,2,4,8,10,10,10...
+        reconn_setting_t reconn;
+        reconn_setting_init(&reconn);
+        reconn.min_delay = 1000;
+        reconn.max_delay = 10000;
+        reconn.delay_policy = 2;
+        setReconnect(&reconn);
+#endif
+
+#if TEST_TLS
+        withTLS();
+#endif
+        printf("client connect to port %d, connfd=%d ...\n", port, connfd);
+        return startConnect();
+    }
+};
+typedef std::shared_ptr<MyTcpClient> MyTcpClientPtr;
+
+int main(int argc, char* argv[]) {
+    if (argc < 2) {
+        printf("Usage: %s port\n", argv[0]);
+        return -10;
+    }
+    int port = atoi(argv[1]);
+
+    EventLoopPtr loop(new EventLoop);
+
+    MyTcpClientPtr cli1(new MyTcpClient(loop));
+    cli1->connect(port);
+
+    MyTcpClientPtr cli2(new MyTcpClient(loop));
+    cli2->connect(port);
+
+    loop->run();
+
+    return 0;
+}

+ 42 - 13
evpp/TcpServer.h

@@ -11,11 +11,12 @@
 namespace hv {
 
 template<class TSocketChannel = SocketChannel>
-class TcpServerTmpl {
+class TcpServerEventLoopTmpl {
 public:
     typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
 
-    TcpServerTmpl() {
+    TcpServerEventLoopTmpl(EventLoopPtr loop = NULL) {
+        acceptor_loop = loop ? loop : std::make_shared<EventLoop>();
         listenfd = -1;
         tls = false;
         unpack_setting.mode = UNPACK_MODE_NONE;
@@ -23,7 +24,7 @@ public:
         load_balance = LB_RoundRobin;
     }
 
-    virtual ~TcpServerTmpl() {
+    virtual ~TcpServerEventLoopTmpl() {
     }
 
     EventLoopPtr loop(int idx = -1) {
@@ -38,7 +39,7 @@ public:
     // closesocket thread-safe
     void closesocket() {
         if (listenfd >= 0) {
-            hio_close_async(hio_get(acceptor_thread.hloop(), listenfd));
+            hio_close_async(hio_get(acceptor_loop->loop(), listenfd));
             listenfd = -1;
         }
     }
@@ -58,7 +59,7 @@ public:
 
     int startAccept() {
         assert(listenfd >= 0);
-        hio_t* listenio = haccept(acceptor_thread.hloop(), listenfd, onAccept);
+        hio_t* listenio = haccept(acceptor_loop->loop(), listenfd, onAccept);
         hevent_set_userdata(listenio, this);
         if (tls) {
             hio_enable_ssl(listenio);
@@ -66,15 +67,15 @@ public:
         return 0;
     }
 
+    // start thread-safe
     void start(bool wait_threads_started = true) {
         if (worker_threads.threadNum() > 0) {
             worker_threads.start(wait_threads_started);
         }
-        acceptor_thread.start(wait_threads_started, std::bind(&TcpServerTmpl::startAccept, this));
+        acceptor_loop->runInLoop(std::bind(&TcpServerEventLoopTmpl::startAccept, this));
     }
     // stop thread-safe
     void stop(bool wait_threads_stopped = true) {
-        acceptor_thread.stop(wait_threads_stopped);
         if (worker_threads.threadNum() > 0) {
             worker_threads.stop(wait_threads_stopped);
         }
@@ -147,7 +148,7 @@ public:
 
 private:
     static void newConnEvent(hio_t* connio) {
-        TcpServerTmpl* server = (TcpServerTmpl*)hevent_userdata(connio);
+        TcpServerEventLoopTmpl* server = (TcpServerEventLoopTmpl*)hevent_userdata(connio);
         if (server->connectionNum() >= server->max_connections) {
             hlogw("over max_connections");
             hio_close(connio);
@@ -196,15 +197,15 @@ private:
     }
 
     static void onAccept(hio_t* connio) {
-        TcpServerTmpl* server = (TcpServerTmpl*)hevent_userdata(connio);
+        TcpServerEventLoopTmpl* server = (TcpServerEventLoopTmpl*)hevent_userdata(connio);
         // NOTE: detach from acceptor loop
         hio_detach(connio);
         EventLoopPtr worker_loop = server->worker_threads.nextLoop(server->load_balance);
         if (worker_loop == NULL) {
-            worker_loop = server->acceptor_thread.loop();
+            worker_loop = server->acceptor_loop;
         }
         ++worker_loop->connectionNum;
-        worker_loop->runInLoop(std::bind(&TcpServerTmpl::newConnEvent, connio));
+        worker_loop->runInLoop(std::bind(&TcpServerEventLoopTmpl::newConnEvent, connio));
     }
 
 public:
@@ -225,8 +226,36 @@ private:
     std::map<uint32_t, TSocketChannelPtr>   channels; // GUAREDE_BY(mutex_)
     std::mutex                              mutex_;
 
-    EventLoopThread                 acceptor_thread;
-    EventLoopThreadPool             worker_threads;
+    EventLoopPtr            acceptor_loop;
+    EventLoopThreadPool     worker_threads;
+};
+
+template<class TSocketChannel = SocketChannel>
+class TcpServerTmpl : private EventLoopThread, public TcpServerEventLoopTmpl<TSocketChannel> {
+public:
+    TcpServerTmpl(EventLoopPtr loop = NULL)
+        : EventLoopThread()
+        , TcpServerEventLoopTmpl<TSocketChannel>(EventLoopThread::loop())
+    {}
+    virtual ~TcpServerTmpl() {
+        stop(true);
+    }
+
+    const EventLoopPtr& loop(int idx = -1) {
+        return TcpServerEventLoopTmpl<TSocketChannel>::loop(idx);
+    }
+
+    // start thread-safe
+    void start(bool wait_threads_started = true) {
+        TcpServerEventLoopTmpl<TSocketChannel>::start(wait_threads_started);
+        EventLoopThread::start(wait_threads_started);
+    }
+
+    // stop thread-safe
+    void stop(bool wait_threads_stopped = true) {
+        EventLoopThread::stop(wait_threads_stopped);
+        TcpServerEventLoopTmpl<TSocketChannel>::stop(wait_threads_stopped);
+    }
 };
 
 typedef TcpServerTmpl<SocketChannel> TcpServer;

+ 36 - 12
evpp/UdpClient.h

@@ -9,27 +9,28 @@
 namespace hv {
 
 template<class TSocketChannel = SocketChannel>
-class UdpClientTmpl {
+class UdpClientEventLoopTmpl {
 public:
     typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
 
-    UdpClientTmpl() {
+    UdpClientEventLoopTmpl(EventLoopPtr loop = NULL) {
+        loop_ = loop ? loop : std::make_shared<EventLoop>();
 #if WITH_KCP
         enable_kcp = false;
 #endif
     }
 
-    virtual ~UdpClientTmpl() {
+    virtual ~UdpClientEventLoopTmpl() {
     }
 
     const EventLoopPtr& loop() {
-        return loop_thread.loop();
+        return loop_;
     }
 
     //NOTE: By default, not bind local port. If necessary, you can call system api bind() after createsocket().
     //@retval >=0 sockfd, <0 error
     int createsocket(int remote_port, const char* remote_host = "127.0.0.1") {
-        hio_t* io = hloop_create_udp_client(loop_thread.hloop(), remote_host, remote_port);
+        hio_t* io = hloop_create_udp_client(loop_->loop(), remote_host, remote_port);
         if (io == NULL) return -1;
         channel.reset(new TSocketChannel(io));
         return channel->fd();
@@ -61,12 +62,9 @@ public:
         return channel->startRead();
     }
 
-    void start(bool wait_threads_started = true) {
-        loop_thread.start(wait_threads_started, std::bind(&UdpClientTmpl::startRecv, this));
-    }
-    // stop thread-safe
-    void stop(bool wait_threads_stopped = true) {
-        loop_thread.stop(wait_threads_stopped);
+    // start thread-safe
+    void start() {
+        loop_->runInLoop(std::bind(&UdpClientEventLoopTmpl::startRecv, this));
     }
 
     // sendto thread-safe
@@ -107,7 +105,33 @@ public:
 
 private:
     std::mutex              sendto_mutex;
-    EventLoopThread         loop_thread;
+    EventLoopPtr            loop_;
+};
+
+template<class TSocketChannel = SocketChannel>
+class UdpClientTmpl : private EventLoopThread, public UdpClientEventLoopTmpl<TSocketChannel> {
+public:
+    UdpClientTmpl(EventLoopPtr loop = NULL)
+        : EventLoopThread()
+        , UdpClientEventLoopTmpl<TSocketChannel>(EventLoopThread::loop())
+    {}
+    virtual ~UdpClientTmpl() {
+        stop(true);
+    }
+
+    const EventLoopPtr& loop() {
+        return EventLoopThread::loop();
+    }
+
+    // start thread-safe
+    void start(bool wait_threads_started = true) {
+        EventLoopThread::start(wait_threads_started, std::bind(&UdpClientTmpl::startRecv, this));
+    }
+
+    // stop thread-safe
+    void stop(bool wait_threads_stopped = true) {
+        EventLoopThread::stop(wait_threads_stopped);
+    }
 };
 
 typedef UdpClientTmpl<SocketChannel> UdpClient;

+ 36 - 12
evpp/UdpServer.h

@@ -9,26 +9,27 @@
 namespace hv {
 
 template<class TSocketChannel = SocketChannel>
-class UdpServerTmpl {
+class UdpServerEventLoopTmpl {
 public:
     typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
 
-    UdpServerTmpl() {
+    UdpServerEventLoopTmpl(EventLoopPtr loop = NULL) {
+        loop_ = loop ? loop : std::make_shared<EventLoop>();
 #if WITH_KCP
         enable_kcp = false;
 #endif
     }
 
-    virtual ~UdpServerTmpl() {
+    virtual ~UdpServerEventLoopTmpl() {
     }
 
     const EventLoopPtr& loop() {
-        return loop_thread.loop();
+        return loop_;
     }
 
     //@retval >=0 bindfd, <0 error
     int createsocket(int port, const char* host = "0.0.0.0") {
-        hio_t* io = hloop_create_udp_server(loop_thread.hloop(), host, port);
+        hio_t* io = hloop_create_udp_server(loop_->loop(), host, port);
         if (io == NULL) return -1;
         channel.reset(new TSocketChannel(io));
         return channel->fd();
@@ -60,12 +61,9 @@ public:
         return channel->startRead();
     }
 
-    void start(bool wait_threads_started = true) {
-        loop_thread.start(wait_threads_started, std::bind(&UdpServerTmpl::startRecv, this));
-    }
-    // stop thread-safe
-    void stop(bool wait_threads_stopped = true) {
-        loop_thread.stop(wait_threads_stopped);
+    // start thread-safe
+    void start() {
+        loop_->runInLoop(std::bind(&UdpServerEventLoopTmpl::startRecv, this));
     }
 
     // sendto thread-safe
@@ -95,7 +93,33 @@ public:
 
 private:
     std::mutex              sendto_mutex;
-    EventLoopThread         loop_thread;
+    EventLoopPtr            loop_;
+};
+
+template<class TSocketChannel = SocketChannel>
+class UdpServerTmpl : private EventLoopThread, public UdpServerEventLoopTmpl<TSocketChannel> {
+public:
+    UdpServerTmpl(EventLoopPtr loop = NULL)
+        : EventLoopThread()
+        , UdpServerEventLoopTmpl<TSocketChannel>(EventLoopThread::loop())
+    {}
+    virtual ~UdpServerTmpl() {
+        stop(true);
+    }
+
+    const EventLoopPtr& loop() {
+        return EventLoopThread::loop();
+    }
+
+    // start thread-safe
+    void start(bool wait_threads_started = true) {
+        EventLoopThread::start(wait_threads_started, std::bind(&UdpServerTmpl::startRecv, this));
+    }
+
+    // stop thread-safe
+    void stop(bool wait_threads_stopped = true) {
+        EventLoopThread::stop(wait_threads_stopped);
+    }
 };
 
 typedef UdpServerTmpl<SocketChannel> UdpServer;