1
0
Эх сурвалжийг харах

template<class TSocketChannel>

ithewei 4 жил өмнө
parent
commit
897f7d3b91

+ 0 - 1
Makefile.vars

@@ -51,7 +51,6 @@ CPPUTIL_HEADERS = cpputil/hmap.h\
 				cpputil/ThreadLocalStorage.h\
 				cpputil/ThreadLocalStorage.h\
 
 
 EVPP_HEADERS  = evpp/Buffer.h\
 EVPP_HEADERS  = evpp/Buffer.h\
-				evpp/Callback.h\
 				evpp/Channel.h\
 				evpp/Channel.h\
 				evpp/Event.h\
 				evpp/Event.h\
 				evpp/EventLoop.h\
 				evpp/EventLoop.h\

+ 0 - 1
cmake/vars.cmake

@@ -54,7 +54,6 @@ set(CPPUTIL_HEADERS
 
 
 set(EVPP_HEADERS
 set(EVPP_HEADERS
     evpp/Buffer.h
     evpp/Buffer.h
-    evpp/Callback.h
     evpp/Channel.h
     evpp/Channel.h
     evpp/Event.h
     evpp/Event.h
     evpp/EventLoop.h
     evpp/EventLoop.h

+ 0 - 17
evpp/Callback.h

@@ -1,17 +0,0 @@
-#ifndef HV_CALLBACK_HPP_
-#define HV_CALLBACK_HPP_
-
-#include <functional>
-
-#include "Buffer.h"
-#include "Channel.h"
-
-namespace hv {
-
-typedef std::function<void(const SocketChannelPtr&)>            ConnectionCallback;
-typedef std::function<void(const SocketChannelPtr&, Buffer*)>   MessageCallback;
-typedef std::function<void(const SocketChannelPtr&, Buffer*)>   WriteCompleteCallback;
-
-}
-
-#endif // HV_CALLBACK_HPP_

+ 0 - 1
evpp/TcpClient.h

@@ -6,7 +6,6 @@
 #include "hlog.h"
 #include "hlog.h"
 
 
 #include "EventLoopThread.h"
 #include "EventLoopThread.h"
-#include "Callback.h"
 #include "Channel.h"
 #include "Channel.h"
 
 
 namespace hv {
 namespace hv {

+ 24 - 20
evpp/TcpServer.h

@@ -6,21 +6,23 @@
 #include "hlog.h"
 #include "hlog.h"
 
 
 #include "EventLoopThreadPool.h"
 #include "EventLoopThreadPool.h"
-#include "Callback.h"
 #include "Channel.h"
 #include "Channel.h"
 
 
 namespace hv {
 namespace hv {
 
 
-class TcpServer {
+template<class TSocketChannel = SocketChannel>
+class TcpServerTmpl {
 public:
 public:
-    TcpServer() {
+    typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
+
+    TcpServerTmpl() {
         listenfd = -1;
         listenfd = -1;
         tls = false;
         tls = false;
         enable_unpack = false;
         enable_unpack = false;
         max_connections = 0xFFFFFFFF;
         max_connections = 0xFFFFFFFF;
     }
     }
 
 
-    virtual ~TcpServer() {
+    virtual ~TcpServerTmpl() {
     }
     }
 
 
     EventLoopPtr loop(int idx = -1) {
     EventLoopPtr loop(int idx = -1) {
@@ -58,7 +60,7 @@ public:
 
 
     void start(bool wait_threads_started = true) {
     void start(bool wait_threads_started = true) {
         worker_threads.start(wait_threads_started);
         worker_threads.start(wait_threads_started);
-        acceptor_thread.start(wait_threads_started, std::bind(&TcpServer::startAccept, this));
+        acceptor_thread.start(wait_threads_started, std::bind(&TcpServerTmpl::startAccept, this));
     }
     }
     void stop(bool wait_threads_stopped = true) {
     void stop(bool wait_threads_stopped = true) {
         acceptor_thread.stop(wait_threads_stopped);
         acceptor_thread.stop(wait_threads_stopped);
@@ -91,15 +93,15 @@ public:
     }
     }
 
 
     // channel
     // channel
-    const SocketChannelPtr& addChannel(hio_t* io) {
+    const TSocketChannelPtr& addChannel(hio_t* io) {
         int fd = hio_fd(io);
         int fd = hio_fd(io);
-        auto channel = SocketChannelPtr(new SocketChannel(io));
+        auto channel = TSocketChannelPtr(new TSocketChannel(io));
         std::lock_guard<std::mutex> locker(mutex_);
         std::lock_guard<std::mutex> locker(mutex_);
         channels[fd] = channel;
         channels[fd] = channel;
         return channels[fd];
         return channels[fd];
     }
     }
 
 
-    void removeChannel(const SocketChannelPtr& channel) {
+    void removeChannel(const TSocketChannelPtr& channel) {
         int fd = channel->fd();
         int fd = channel->fd();
         std::lock_guard<std::mutex> locker(mutex_);
         std::lock_guard<std::mutex> locker(mutex_);
         channels.erase(fd);
         channels.erase(fd);
@@ -110,7 +112,7 @@ public:
         return channels.size();
         return channels.size();
     }
     }
 
 
-    int foreachChannel(std::function<void(const SocketChannelPtr& channel)> fn) {
+    int foreachChannel(std::function<void(const TSocketChannelPtr& channel)> fn) {
         std::lock_guard<std::mutex> locker(mutex_);
         std::lock_guard<std::mutex> locker(mutex_);
         for (auto& pair : channels) {
         for (auto& pair : channels) {
             fn(pair.second);
             fn(pair.second);
@@ -119,7 +121,7 @@ public:
     }
     }
 
 
     int broadcast(const void* data, int size) {
     int broadcast(const void* data, int size) {
-        return foreachChannel([data, size](const SocketChannelPtr& channel) {
+        return foreachChannel([data, size](const TSocketChannelPtr& channel) {
             channel->write(data, size);
             channel->write(data, size);
         });
         });
     }
     }
@@ -130,7 +132,7 @@ public:
 
 
 private:
 private:
     static void newConnEvent(hio_t* connio) {
     static void newConnEvent(hio_t* connio) {
-        TcpServer* server = (TcpServer*)hevent_userdata(connio);
+        TcpServerTmpl* server = (TcpServerTmpl*)hevent_userdata(connio);
         if (server->connectionNum() >= server->max_connections) {
         if (server->connectionNum() >= server->max_connections) {
             hlogw("over max_connections");
             hlogw("over max_connections");
             hio_close(connio);
             hio_close(connio);
@@ -142,7 +144,7 @@ private:
         assert(worker_loop != NULL);
         assert(worker_loop != NULL);
         hio_attach(worker_loop->loop(), connio);
         hio_attach(worker_loop->loop(), connio);
 
 
-        const SocketChannelPtr& channel = server->addChannel(connio);
+        const TSocketChannelPtr& channel = server->addChannel(connio);
         channel->status = SocketChannel::CONNECTED;
         channel->status = SocketChannel::CONNECTED;
 
 
         channel->onread = [server, &channel](Buffer* buf) {
         channel->onread = [server, &channel](Buffer* buf) {
@@ -175,12 +177,12 @@ private:
     }
     }
 
 
     static void onAccept(hio_t* connio) {
     static void onAccept(hio_t* connio) {
-        TcpServer* server = (TcpServer*)hevent_userdata(connio);
+        TcpServerTmpl* server = (TcpServerTmpl*)hevent_userdata(connio);
         // NOTE: detach from acceptor loop
         // NOTE: detach from acceptor loop
         hio_detach(connio);
         hio_detach(connio);
         // Load Banlance: Round-Robin
         // Load Banlance: Round-Robin
         EventLoopPtr worker_loop = server->worker_threads.nextLoop();
         EventLoopPtr worker_loop = server->worker_threads.nextLoop();
-        worker_loop->queueInLoop(std::bind(&TcpServer::newConnEvent, connio));
+        worker_loop->queueInLoop(std::bind(&TcpServerTmpl::newConnEvent, connio));
     }
     }
 
 
 public:
 public:
@@ -189,21 +191,23 @@ public:
     bool                    enable_unpack;
     bool                    enable_unpack;
     unpack_setting_t        unpack_setting;
     unpack_setting_t        unpack_setting;
     // Callback
     // Callback
-    ConnectionCallback      onConnection;
-    MessageCallback         onMessage;
-    WriteCompleteCallback   onWriteComplete;
+    std::function<void(const TSocketChannelPtr&)>           onConnection;
+    std::function<void(const TSocketChannelPtr&, Buffer*)>  onMessage;
+    std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
 
 
     uint32_t                max_connections;
     uint32_t                max_connections;
 
 
 private:
 private:
-    // fd => SocketChannelPtr
-    std::map<int, SocketChannelPtr> channels; // GUAREDE_BY(mutex_)
-    std::mutex                      mutex_;
+    // fd => TSocketChannelPtr
+    std::map<int, TSocketChannelPtr> channels; // GUAREDE_BY(mutex_)
+    std::mutex                       mutex_;
 
 
     EventLoopThread                 acceptor_thread;
     EventLoopThread                 acceptor_thread;
     EventLoopThreadPool             worker_threads;
     EventLoopThreadPool             worker_threads;
 };
 };
 
 
+typedef TcpServerTmpl<SocketChannel> TcpServer;
+
 }
 }
 
 
 #endif // HV_TCP_SERVER_HPP_
 #endif // HV_TCP_SERVER_HPP_

+ 13 - 9
evpp/UdpClient.h

@@ -4,20 +4,22 @@
 #include "hsocket.h"
 #include "hsocket.h"
 
 
 #include "EventLoopThread.h"
 #include "EventLoopThread.h"
-#include "Callback.h"
 #include "Channel.h"
 #include "Channel.h"
 
 
 namespace hv {
 namespace hv {
 
 
-class UdpClient {
+template<class TSocketChannel = SocketChannel>
+class UdpClientTmpl {
 public:
 public:
-    UdpClient() {
+    typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
+
+    UdpClientTmpl() {
 #if WITH_KCP
 #if WITH_KCP
         enable_kcp = false;
         enable_kcp = false;
 #endif
 #endif
     }
     }
 
 
-    virtual ~UdpClient() {
+    virtual ~UdpClientTmpl() {
     }
     }
 
 
     const EventLoopPtr& loop() {
     const EventLoopPtr& loop() {
@@ -28,7 +30,7 @@ public:
     int createsocket(int port, const char* host = "127.0.0.1") {
     int createsocket(int port, const char* host = "127.0.0.1") {
         hio_t* io = hloop_create_udp_client(loop_thread.hloop(), host, port);
         hio_t* io = hloop_create_udp_client(loop_thread.hloop(), host, port);
         if (io == NULL) return -1;
         if (io == NULL) return -1;
-        channel.reset(new SocketChannel(io));
+        channel.reset(new TSocketChannel(io));
         return channel->fd();
         return channel->fd();
     }
     }
     void closesocket() {
     void closesocket() {
@@ -59,7 +61,7 @@ public:
     }
     }
 
 
     void start(bool wait_threads_started = true) {
     void start(bool wait_threads_started = true) {
-        loop_thread.start(wait_threads_started, std::bind(&UdpClient::startRecv, this));
+        loop_thread.start(wait_threads_started, std::bind(&UdpClientTmpl::startRecv, this));
     }
     }
     void stop(bool wait_threads_stopped = true) {
     void stop(bool wait_threads_stopped = true) {
         loop_thread.stop(wait_threads_stopped);
         loop_thread.stop(wait_threads_stopped);
@@ -90,20 +92,22 @@ public:
 #endif
 #endif
 
 
 public:
 public:
-    SocketChannelPtr        channel;
+    TSocketChannelPtr       channel;
 #if WITH_KCP
 #if WITH_KCP
     bool                    enable_kcp;
     bool                    enable_kcp;
     kcp_setting_t           kcp_setting;
     kcp_setting_t           kcp_setting;
 #endif
 #endif
     // Callback
     // Callback
-    MessageCallback         onMessage;
-    WriteCompleteCallback   onWriteComplete;
+    std::function<void(const TSocketChannelPtr&, Buffer*)>  onMessage;
+    std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
 
 
 private:
 private:
     std::mutex              sendto_mutex;
     std::mutex              sendto_mutex;
     EventLoopThread         loop_thread;
     EventLoopThread         loop_thread;
 };
 };
 
 
+typedef UdpClientTmpl<SocketChannel> UdpClient;
+
 }
 }
 
 
 #endif // HV_UDP_CLIENT_HPP_
 #endif // HV_UDP_CLIENT_HPP_

+ 13 - 9
evpp/UdpServer.h

@@ -4,20 +4,22 @@
 #include "hsocket.h"
 #include "hsocket.h"
 
 
 #include "EventLoopThreadPool.h"
 #include "EventLoopThreadPool.h"
-#include "Callback.h"
 #include "Channel.h"
 #include "Channel.h"
 
 
 namespace hv {
 namespace hv {
 
 
-class UdpServer {
+template<class TSocketChannel = SocketChannel>
+class UdpServerTmpl {
 public:
 public:
-    UdpServer() {
+    typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
+
+    UdpServerTmpl() {
 #if WITH_KCP
 #if WITH_KCP
         enable_kcp = false;
         enable_kcp = false;
 #endif
 #endif
     }
     }
 
 
-    virtual ~UdpServer() {
+    virtual ~UdpServerTmpl() {
     }
     }
 
 
     const EventLoopPtr& loop() {
     const EventLoopPtr& loop() {
@@ -28,7 +30,7 @@ public:
     int createsocket(int port, const char* host = "0.0.0.0") {
     int createsocket(int port, const char* host = "0.0.0.0") {
         hio_t* io = hloop_create_udp_server(loop_thread.hloop(), host, port);
         hio_t* io = hloop_create_udp_server(loop_thread.hloop(), host, port);
         if (io == NULL) return -1;
         if (io == NULL) return -1;
-        channel.reset(new SocketChannel(io));
+        channel.reset(new TSocketChannel(io));
         return channel->fd();
         return channel->fd();
     }
     }
     void closesocket() {
     void closesocket() {
@@ -59,7 +61,7 @@ public:
     }
     }
 
 
     void start(bool wait_threads_started = true) {
     void start(bool wait_threads_started = true) {
-        loop_thread.start(wait_threads_started, std::bind(&UdpServer::startRecv, this));
+        loop_thread.start(wait_threads_started, std::bind(&UdpServerTmpl::startRecv, this));
     }
     }
     void stop(bool wait_threads_stopped = true) {
     void stop(bool wait_threads_stopped = true) {
         loop_thread.stop(wait_threads_stopped);
         loop_thread.stop(wait_threads_stopped);
@@ -79,20 +81,22 @@ public:
     }
     }
 
 
 public:
 public:
-    SocketChannelPtr        channel;
+    TSocketChannelPtr       channel;
 #if WITH_KCP
 #if WITH_KCP
     bool                    enable_kcp;
     bool                    enable_kcp;
     kcp_setting_t           kcp_setting;
     kcp_setting_t           kcp_setting;
 #endif
 #endif
     // Callback
     // Callback
-    MessageCallback         onMessage;
-    WriteCompleteCallback   onWriteComplete;
+    std::function<void(const TSocketChannelPtr&, Buffer*)>  onMessage;
+    std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
 
 
 private:
 private:
     std::mutex              sendto_mutex;
     std::mutex              sendto_mutex;
     EventLoopThread         loop_thread;
     EventLoopThread         loop_thread;
 };
 };
 
 
+typedef UdpServerTmpl<SocketChannel> UdpServer;
+
 }
 }
 
 
 #endif // HV_UDP_SERVER_HPP_
 #endif // HV_UDP_SERVER_HPP_