Browse Source

#155: TcpServer::getChannelById

ithewei 3 years ago
parent
commit
bb2a49fb9e
4 changed files with 24 additions and 19 deletions
  1. 1 0
      event/hloop.h
  2. 4 4
      evpp/Channel.h
  3. 17 13
      evpp/TcpServer.h
  4. 2 2
      evpp/TcpServer_test.cpp

+ 1 - 0
event/hloop.h

@@ -445,6 +445,7 @@ HV_EXPORT hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port);
 
 //-----------------unpack---------------------------------------------
 typedef enum {
+    UNPACK_MODE_NONE        = 0,
     UNPACK_BY_FIXED_LENGTH  = 1,    // Not recommended
     UNPACK_BY_DELIMITER     = 2,    // Suitable for text protocol
     UNPACK_BY_LENGTH_FIELD  = 3,    // Suitable for binary protocol

+ 4 - 4
evpp/Channel.h

@@ -155,6 +155,7 @@ public:
         CLOSED,
     } status;
     std::function<void(Buffer*)> onread;
+    // NOTE: Use Channel::isWriteComplete in onwrite callback to determine whether all data has been written.
     std::function<void(Buffer*)> onwrite;
     std::function<void()>        onclose;
 
@@ -217,37 +218,36 @@ public:
         return hio_new_ssl_ctx(io_, opt);
     }
 
+    // timeout
     void setConnectTimeout(int timeout_ms) {
         if (io_ == NULL) return;
         hio_set_connect_timeout(io_, timeout_ms);
     }
-
     void setCloseTimeout(int timeout_ms) {
         if (io_ == NULL) return;
         hio_set_close_timeout(io_, timeout_ms);
     }
-
     void setReadTimeout(int timeout_ms) {
         if (io_ == NULL) return;
         hio_set_read_timeout(io_, timeout_ms);
     }
-
     void setWriteTimeout(int timeout_ms) {
         if (io_ == NULL) return;
         hio_set_write_timeout(io_, timeout_ms);
     }
-
     void setKeepaliveTimeout(int timeout_ms) {
         if (io_ == NULL) return;
         hio_set_keepalive_timeout(io_, timeout_ms);
     }
 
+    // heartbeat
     void setHeartbeat(int interval_ms, std::function<void()> fn) {
         if (io_ == NULL) return;
         heartbeat = std::move(fn);
         hio_set_heartbeat(io_, interval_ms, send_heartbeat);
     }
 
+    // unpack
     void setUnpack(unpack_setting_t* setting) {
         if (io_ == NULL) return;
         hio_set_unpack(io_, setting);

+ 17 - 13
evpp/TcpServer.h

@@ -18,7 +18,7 @@ public:
     TcpServerTmpl() {
         listenfd = -1;
         tls = false;
-        enable_unpack = false;
+        unpack_setting.mode = UNPACK_MODE_NONE;
         max_connections = 0xFFFFFFFF;
     }
 
@@ -93,26 +93,31 @@ public:
 
     void setUnpack(unpack_setting_t* setting) {
         if (setting) {
-            enable_unpack = true;
             unpack_setting = *setting;
         } else {
-            enable_unpack = false;
+            unpack_setting.mode = UNPACK_MODE_NONE;
         }
     }
 
     // channel
     const TSocketChannelPtr& addChannel(hio_t* io) {
-        int fd = hio_fd(io);
+        uint32_t id = hio_id(io);
         auto channel = TSocketChannelPtr(new TSocketChannel(io));
         std::lock_guard<std::mutex> locker(mutex_);
-        channels[fd] = channel;
-        return channels[fd];
+        channels[id] = channel;
+        return channels[id];
+    }
+
+    TSocketChannelPtr getChannelById(uint32_t id) {
+        std::lock_guard<std::mutex> locker(mutex_);
+        auto iter = channels.find(id);
+        return iter != channels.end() ? iter->second : NULL;
     }
 
     void removeChannel(const TSocketChannelPtr& channel) {
-        int fd = channel->fd();
+        uint32_t id = channel->id();
         std::lock_guard<std::mutex> locker(mutex_);
-        channels.erase(fd);
+        channels.erase(id);
     }
 
     size_t connectionNum() {
@@ -176,7 +181,7 @@ private:
             // so in this lambda function, no code should be added below.
         };
 
-        if (server->enable_unpack) {
+        if (server->unpack_setting.mode != UNPACK_MODE_NONE) {
             channel->setUnpack(&server->unpack_setting);
         }
         channel->startRead();
@@ -200,7 +205,6 @@ private:
 public:
     int                     listenfd;
     bool                    tls;
-    bool                    enable_unpack;
     unpack_setting_t        unpack_setting;
     // Callback
     std::function<void(const TSocketChannelPtr&)>           onConnection;
@@ -211,9 +215,9 @@ public:
     uint32_t                max_connections;
 
 private:
-    // fd => TSocketChannelPtr
-    std::map<int, TSocketChannelPtr> channels; // GUAREDE_BY(mutex_)
-    std::mutex                       mutex_;
+    // id => TSocketChannelPtr
+    std::map<uint32_t, TSocketChannelPtr>   channels; // GUAREDE_BY(mutex_)
+    std::mutex                              mutex_;
 
     EventLoopThread                 acceptor_thread;
     EventLoopThreadPool             worker_threads;

+ 2 - 2
evpp/TcpServer_test.cpp

@@ -27,9 +27,9 @@ int main(int argc, char* argv[]) {
     srv.onConnection = [](const SocketChannelPtr& channel) {
         std::string peeraddr = channel->peeraddr();
         if (channel->isConnected()) {
-            printf("%s connected! connfd=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), currentThreadEventLoop->tid());
+            printf("%s connected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid());
         } else {
-            printf("%s disconnected! connfd=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), currentThreadEventLoop->tid());
+            printf("%s disconnected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid());
         }
     };
     srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {