ithewei 4 lat temu
rodzic
commit
3abebcd20e
4 zmienionych plików z 88 dodań i 70 usunięć
  1. 9 7
      evpp/TcpClient.h
  2. 21 13
      evpp/TcpServer.h
  3. 29 26
      evpp/UdpClient.h
  4. 29 24
      evpp/UdpServer.h

+ 9 - 7
evpp/TcpClient.h

@@ -67,7 +67,6 @@ public:
         }
         return createsocket(&peeraddr.sa);
     }
-
     int createsocket(struct sockaddr* peeraddr) {
         int connfd = socket(peeraddr->sa_family, SOCK_STREAM, 0);
         // SOCKADDR_PRINT(peeraddr);
@@ -82,6 +81,12 @@ public:
         channel.reset(new TSocketChannel(io));
         return connfd;
     }
+    void closesocket() {
+        if (channel) {
+            channel->close();
+            channel = NULL;
+        }
+    }
 
     int startConnect() {
         assert(channel != NULL);
@@ -166,15 +171,11 @@ public:
         if (!isConnected()) return -1;
         return channel->write(data, size);
     }
-
     int send(Buffer* buf) {
-        if (!isConnected()) return -1;
-        return channel->write(buf);
+        return send(buf->data(), buf->size());
     }
-
     int send(const std::string& str) {
-        if (!isConnected()) return -1;
-        return channel->write(str);
+        return send(str.data(), str.size());
     }
 
     int withTLS(const char* cert_file = NULL, const char* key_file = NULL, bool verify_peer = false) {
@@ -231,6 +232,7 @@ public:
     std::function<void(const TSocketChannelPtr&)>           onConnection;
     std::function<void(const TSocketChannelPtr&, Buffer*)>  onMessage;
     std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
+
 private:
     EventLoopThread         loop_thread;
 };

+ 21 - 13
evpp/TcpServer.h

@@ -23,11 +23,21 @@ public:
     virtual ~TcpServer() {
     }
 
+    EventLoopPtr loop(int idx = -1) {
+        return loop_threads.loop(idx);
+    }
+
     //@retval >=0 listenfd, <0 error
     int createsocket(int port, const char* host = "0.0.0.0") {
         listenfd = Listen(port, host);
         return listenfd;
     }
+    void closesocket() {
+        if (listenfd >= 0) {
+            ::closesocket(listenfd);
+            listenfd = -1;
+        }
+    }
 
     void setMaxConnectionNum(uint32_t num) {
         max_connections = num;
@@ -35,27 +45,25 @@ public:
     void setThreadNum(int num) {
         loop_threads.setThreadNum(num);
     }
+
+    void startAccept(const EventLoopPtr& loop) {
+        assert(listenfd >= 0);
+        hio_t* listenio = haccept(loop->loop(), listenfd, onAccept);
+        hevent_set_userdata(listenio, this);
+        if (tls) {
+            hio_enable_ssl(listenio);
+        }
+    }
+
     void start(bool wait_threads_started = true) {
         loop_threads.start(wait_threads_started, [this](const EventLoopPtr& loop){
-            assert(listenfd >= 0);
-            hio_t* listenio = haccept(loop->loop(), listenfd, onAccept);
-            hevent_set_userdata(listenio, this);
-            if (tls) {
-                hio_enable_ssl(listenio);
-            }
+            startAccept(loop);
         });
     }
     void stop(bool wait_threads_stopped = true) {
         loop_threads.stop(wait_threads_stopped);
     }
 
-    EventLoopPtr loop(int idx = -1) {
-        return loop_threads.loop(idx);
-    }
-    hloop_t* hloop(int idx = -1) {
-        return loop_threads.hloop(idx);
-    }
-
     int withTLS(const char* cert_file, const char* key_file) {
         if (cert_file) {
             hssl_ctx_init_param_t param;

+ 29 - 26
evpp/UdpClient.h

@@ -28,43 +28,45 @@ public:
         channel.reset(new SocketChannel(io));
         return channel->fd();
     }
+    void closesocket() {
+        if (channel) {
+            channel->close();
+            channel = NULL;
+        }
+    }
 
-    void start(bool wait_threads_started = true) {
-        loop_thread.start(wait_threads_started,
-            [this]() {
-                assert(channel != NULL);
-                channel->onread = [this](Buffer* buf) {
-                    if (onMessage) {
-                        onMessage(channel, buf);
-                    }
-                };
-                channel->onwrite = [this](Buffer* buf) {
-                    if (onWriteComplete) {
-                        onWriteComplete(channel, buf);
-                    }
-                };
-                channel->startRead();
-                return 0;
+    int startRecv() {
+        assert(channel != NULL);
+        channel->onread = [this](Buffer* buf) {
+            if (onMessage) {
+                onMessage(channel, buf);
+            }
+        };
+        channel->onwrite = [this](Buffer* buf) {
+            if (onWriteComplete) {
+                onWriteComplete(channel, buf);
             }
-        );
+        };
+        return channel->startRead();
+    }
+
+    void start(bool wait_threads_started = true) {
+        loop_thread.start(wait_threads_started, std::bind(&UdpClient::startRecv, this));
     }
     void stop(bool wait_threads_stopped = true) {
         loop_thread.stop(wait_threads_stopped);
     }
 
-    int sendto(const void* data, int size) {
+    int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
         if (channel == NULL) return -1;
+        if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
         return channel->write(data, size);
     }
-
-    int sendto(Buffer* buf) {
-        if (channel == NULL) return -1;
-        return channel->write(buf);
+    int sendto(Buffer* buf, struct sockaddr* peeraddr = NULL) {
+        return sendto(buf->data(), buf->size(), peeraddr);
     }
-
-    int sendto(const std::string& str) {
-        if (channel == NULL) return -1;
-        return channel->write(str);
+    int sendto(const std::string& str, struct sockaddr* peeraddr = NULL) {
+        return sendto(str.data(), str.size(), peeraddr);
     }
 
 public:
@@ -72,6 +74,7 @@ public:
     // Callback
     MessageCallback         onMessage;
     WriteCompleteCallback   onWriteComplete;
+
 private:
     EventLoopThread         loop_thread;
 };

+ 29 - 24
evpp/UdpServer.h

@@ -28,40 +28,45 @@ public:
         channel.reset(new SocketChannel(io));
         return channel->fd();
     }
+    void closesocket() {
+        if (channel) {
+            channel->close();
+            channel = NULL;
+        }
+    }
 
-    void start(bool wait_threads_started = true) {
-        loop_thread.start(wait_threads_started,
-            [this]() {
-                assert(channel != NULL);
-                channel->onread = [this](Buffer* buf) {
-                    if (onMessage) {
-                        onMessage(channel, buf);
-                    }
-                };
-                channel->onwrite = [this](Buffer* buf) {
-                    if (onWriteComplete) {
-                        onWriteComplete(channel, buf);
-                    }
-                };
-                channel->startRead();
-                return 0;
+    int startRecv() {
+        assert(channel != NULL);
+        channel->onread = [this](Buffer* buf) {
+            if (onMessage) {
+                onMessage(channel, buf);
+            }
+        };
+        channel->onwrite = [this](Buffer* buf) {
+            if (onWriteComplete) {
+                onWriteComplete(channel, buf);
             }
-        );
+        };
+        return channel->startRead();
+    }
+
+    void start(bool wait_threads_started = true) {
+        loop_thread.start(wait_threads_started, std::bind(&UdpServer::startRecv, this));
     }
     void stop(bool wait_threads_stopped = true) {
         loop_thread.stop(wait_threads_stopped);
     }
 
-    int sendto(Buffer* buf, struct sockaddr* peeraddr = NULL) {
-        if (channel == NULL) return 0;
+    int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
+        if (channel == NULL) return -1;
         if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
-        return channel->write(buf);
+        return channel->write(data, size);
+    }
+    int sendto(Buffer* buf, struct sockaddr* peeraddr = NULL) {
+        return sendto(buf->data(), buf->size(), peeraddr);
     }
-
     int sendto(const std::string& str, struct sockaddr* peeraddr = NULL) {
-        if (channel == NULL) return 0;
-        if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
-        return channel->write(str);
+        return sendto(str.data(), str.size(), peeraddr);
     }
 
 public: