Bläddra i källkod

Impl LoadBalance

ithewei 3 år sedan
förälder
incheckning
2d2fcbeec4
6 ändrade filer med 68 tillägg och 5 borttagningar
  1. 9 0
      event/hloop.h
  2. 3 0
      evpp/EventLoop.h
  3. 20 3
      evpp/EventLoopThreadPool.h
  4. 11 0
      evpp/TcpClient_test.cpp
  5. 12 2
      evpp/TcpServer.h
  6. 13 0
      evpp/TcpServer_test.cpp

+ 9 - 0
event/hloop.h

@@ -627,6 +627,15 @@ HV_INLINE uint32_t reconn_setting_calc_delay(reconn_setting_t* reconn) {
     return reconn->cur_delay;
 }
 
+//-----------------LoadBalance-------------------------------------
+typedef enum {
+    LB_RoundRobin,
+    LB_Random,
+    LB_LeastConnections,
+    LB_IpHash,
+    LB_UrlHash,
+} load_balance_e;
+
 //-----------------rudp---------------------------------------------
 #if WITH_KCP
 #define WITH_RUDP 1

+ 3 - 0
evpp/EventLoop.h

@@ -32,6 +32,7 @@ public:
             loop_ = hloop_new(HLOOP_FLAG_AUTO_FREE);
             is_loop_owner = true;
         }
+        connectionNum = 0;
         setStatus(kInitialized);
     }
 
@@ -212,6 +213,8 @@ private:
         if (ev && ev->cb) ev->cb(ev.get());
     }
 
+public:
+    std::atomic<uint32_t>       connectionNum;  // for LB_LeastConnections
 private:
     hloop_t*                    loop_;
     bool                        is_loop_owner;

+ 20 - 3
evpp/EventLoopThreadPool.h

@@ -2,6 +2,7 @@
 #define HV_EVENT_LOOP_THREAD_POOL_HPP_
 
 #include "EventLoopThread.h"
+#include "hbase.h"
 
 namespace hv {
 
@@ -27,9 +28,25 @@ public:
         thread_num_ = num;
     }
 
-    EventLoopPtr nextLoop() {
-        if (loop_threads_.empty()) return NULL;
-        return loop_threads_[++next_loop_idx_ % loop_threads_.size()]->loop();
+    EventLoopPtr nextLoop(load_balance_e lb = LB_RoundRobin) {
+        int numLoops = loop_threads_.size();
+        if (numLoops == 0) return NULL;
+        int idx = 0;
+        if (lb == LB_RoundRobin) {
+            if (++next_loop_idx_ >= numLoops) next_loop_idx_ = 0;
+            idx = next_loop_idx_;
+        } else if (lb == LB_Random) {
+            idx = hv_rand(0, numLoops - 1);
+        } else if (lb == LB_LeastConnections) {
+            for (int i = 1; i < numLoops; ++i) {
+                if (loop_threads_[i]->loop()->connectionNum < loop_threads_[idx]->loop()->connectionNum) {
+                    idx = i;
+                }
+            }
+        } else {
+            // Not Implemented
+        }
+        return loop_threads_[idx]->loop();
     }
 
     EventLoopPtr loop(int idx = -1) {

+ 11 - 0
evpp/TcpClient_test.cpp

@@ -10,6 +10,9 @@
 #include "TcpClient.h"
 #include "htime.h"
 
+#define TEST_RECONNECT  1
+#define TEST_TLS        0
+
 using namespace hv;
 
 int main(int argc, char* argv[]) {
@@ -52,6 +55,8 @@ int main(int argc, char* argv[]) {
     cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
         printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
     };
+
+#if TEST_RECONNECT
     // reconnect: 1,2,4,8,10,10,10...
     reconn_setting_t reconn;
     reconn_setting_init(&reconn);
@@ -59,6 +64,12 @@ int main(int argc, char* argv[]) {
     reconn.max_delay = 10000;
     reconn.delay_policy = 2;
     cli.setReconnect(&reconn);
+#endif
+
+#if TEST_TLS
+    cli.withTLS();
+#endif
+
     cli.start();
 
     // press Enter to stop

+ 12 - 2
evpp/TcpServer.h

@@ -20,6 +20,7 @@ public:
         tls = false;
         unpack_setting.mode = UNPACK_MODE_NONE;
         max_connections = 0xFFFFFFFF;
+        load_balance = LB_RoundRobin;
     }
 
     virtual ~TcpServerTmpl() {
@@ -46,6 +47,10 @@ public:
         max_connections = num;
     }
 
+    void setLoadBalance(load_balance_e lb) {
+        load_balance = lb;
+    }
+
     // NOTE: totalThreadNum = 1 acceptor_thread + N worker_threads (N can be 0)
     void setThreadNum(int num) {
         worker_threads.setThreadNum(num);
@@ -168,6 +173,10 @@ private:
             }
         };
         channel->onclose = [server, &channel]() {
+            EventLoop* worker_loop = currentThreadEventLoop;
+            assert(worker_loop != NULL);
+            --worker_loop->connectionNum;
+
             channel->status = SocketChannel::CLOSED;
             if (server->onConnection) {
                 server->onConnection(channel);
@@ -190,11 +199,11 @@ private:
         TcpServerTmpl* server = (TcpServerTmpl*)hevent_userdata(connio);
         // NOTE: detach from acceptor loop
         hio_detach(connio);
-        // Load Banlance: Round-Robin
-        EventLoopPtr worker_loop = server->worker_threads.nextLoop();
+        EventLoopPtr worker_loop = server->worker_threads.nextLoop(server->load_balance);
         if (worker_loop == NULL) {
             worker_loop = server->acceptor_thread.loop();
         }
+        ++worker_loop->connectionNum;
         worker_loop->runInLoop(std::bind(&TcpServerTmpl::newConnEvent, connio));
     }
 
@@ -209,6 +218,7 @@ public:
     std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
 
     uint32_t                max_connections;
+    load_balance_e          load_balance;
 
 private:
     // id => TSocketChannelPtr

+ 13 - 0
evpp/TcpServer_test.cpp

@@ -11,6 +11,8 @@
 
 using namespace hv;
 
+#define TEST_TLS        0
+
 int main(int argc, char* argv[]) {
     if (argc < 2) {
         printf("Usage: %s port\n", argv[0]);
@@ -40,6 +42,17 @@ int main(int argc, char* argv[]) {
         channel->write(buf);
     };
     srv.setThreadNum(4);
+    srv.setLoadBalance(LB_LeastConnections);
+
+#if TEST_TLS
+    hssl_ctx_opt_t ssl_opt;
+    memset(&ssl_opt, 0, sizeof(hssl_ctx_opt_t));
+    ssl_opt.crt_file = "cert/server.crt";
+    ssl_opt.key_file = "cert/server.key";
+    ssl_opt.verify_peer = 0;
+    srv.withTLS(&ssl_opt);
+#endif
+
     srv.start();
 
     // press Enter to stop