Browse Source

fix: add hio_sendto to replace hio_set_peeraddr,hio_write (#707)

ithewei 8 months ago
parent
commit
3d0233dd0e
6 changed files with 34 additions and 19 deletions
  1. 1 0
      event/hloop.h
  2. 14 5
      event/nio.c
  3. 16 4
      event/overlapio.c
  4. 1 4
      evpp/UdpClient.h
  5. 1 4
      evpp/UdpServer.h
  6. 1 2
      examples/nmap/nmap.cpp

+ 1 - 0
event/hloop.h

@@ -376,6 +376,7 @@ HV_EXPORT int hio_read_remain(hio_t* io);
 // NOTE: hio_write is thread-safe, locked by recursive_mutex, allow to be called by other threads.
 // hio_try_write => hio_add(io, HV_WRITE) => write => hwrite_cb
 HV_EXPORT int hio_write  (hio_t* io, const void* buf, size_t len);
+HV_EXPORT int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr);
 
 // NOTE: hio_close is thread-safe, hio_close_async will be called actually in other thread.
 // hio_del(io, HV_RDWR) => close => hclose_cb

+ 14 - 5
event/nio.c

@@ -269,7 +269,7 @@ static int __nio_read(hio_t* io, void* buf, int len) {
     return nread;
 }
 
-static int __nio_write(hio_t* io, const void* buf, int len) {
+static int __nio_write(hio_t* io, const void* buf, int len, struct sockaddr* addr) {
     int nwrite = 0;
     switch (io->io_type) {
     case HIO_TYPE_SSL:
@@ -288,7 +288,8 @@ static int __nio_write(hio_t* io, const void* buf, int len) {
     case HIO_TYPE_KCP:
     case HIO_TYPE_IP:
     {
-        nwrite = sendto(io->fd, buf, len, 0, io->peeraddr, SOCKADDR_LEN(io->peeraddr));
+        if (addr == NULL) addr = io->peeraddr;
+        nwrite = sendto(io->fd, buf, len, 0, addr, SOCKADDR_LEN(addr));
         if (((sockaddr_u*)io->localaddr)->sin.sin_port == 0) {
             socklen_t addrlen = sizeof(sockaddr_u);
             getsockname(io->fd, io->localaddr, &addrlen);
@@ -371,7 +372,7 @@ write:
     char* base = pbuf->base;
     char* buf = base + pbuf->offset;
     int len = pbuf->len - pbuf->offset;
-    nwrite = __nio_write(io, buf, len);
+    nwrite = __nio_write(io, buf, len, NULL);
     // printd("write retval=%d\n", nwrite);
     if (nwrite < 0) {
         err = socket_errno();
@@ -485,7 +486,7 @@ int hio_read (hio_t* io) {
     return 0;
 }
 
-int hio_write (hio_t* io, const void* buf, size_t len) {
+static int hio_write4 (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
     if (io->closed) {
         hloge("hio_write called but fd[%d] already closed!", io->fd);
         return -1;
@@ -501,7 +502,7 @@ int hio_write (hio_t* io, const void* buf, size_t len) {
 #endif
     if (write_queue_empty(&io->write_queue)) {
 try_write:
-        nwrite = __nio_write(io, buf, len);
+        nwrite = __nio_write(io, buf, len, addr);
         // printd("write retval=%d\n", nwrite);
         if (nwrite < 0) {
             err = socket_errno();
@@ -569,6 +570,14 @@ disconnect:
     return nwrite < 0 ? nwrite : -1;
 }
 
+int hio_write (hio_t* io, const void* buf, size_t len) {
+    return hio_write4(io, buf, len, NULL);
+}
+
+int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
+    return hio_write4(io, buf, len, addr);
+}
+
 int hio_close (hio_t* io) {
     if (io->closed) return 0;
     if (io->destroy == 0 && hv_gettid() != io->loop->tid) {

+ 16 - 4
event/overlapio.c

@@ -296,14 +296,15 @@ int hio_read (hio_t* io) {
     return hio_add(io, hio_handle_events, HV_READ);
 }
 
-int hio_write(hio_t* io, const void* buf, size_t len) {
+static int hio_write4 (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
     int nwrite = 0;
 try_send:
     if (io->io_type == HIO_TYPE_TCP) {
         nwrite = send(io->fd, buf, len, 0);
     }
     else if (io->io_type == HIO_TYPE_UDP) {
-        nwrite = sendto(io->fd, buf, len, 0, io->peeraddr, sizeof(struct sockaddr_in6));
+        if (addr == NULL) addr = io->peeraddr;
+        nwrite = sendto(io->fd, buf, len, 0, addr, sizeof(struct sockaddr_in6));
     }
     else if (io->io_type == HIO_TYPE_IP) {
         goto WSASend;
@@ -354,7 +355,8 @@ WSASend:
         }
         else if (io->io_type == HIO_TYPE_UDP ||
                  io->io_type == HIO_TYPE_IP) {
-            ret = WSASendTo(io->fd, &hovlp->buf, 1, &dwbytes, flags, io->peeraddr, sizeof(struct sockaddr_in6), &hovlp->ovlp, NULL);
+            if (addr == NULL) addr = io->peeraddr;
+            ret = WSASendTo(io->fd, &hovlp->buf, 1, &dwbytes, flags, addr, sizeof(struct sockaddr_in6), &hovlp->ovlp, NULL);
         }
         else {
             ret = -1;
@@ -371,10 +373,20 @@ WSASend:
     }
 write_error:
 disconnect:
-    hio_close(io);
+    if (io->io_type & HIO_TYPE_SOCK_STREAM) {
+        hio_close(io);
+    }
     return 0;
 }
 
+int hio_write (hio_t* io, const void* buf, size_t len) {
+    return hio_write4(io, buf, len, NULL);
+}
+
+int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
+    return hio_write4(io, buf, len, addr);
+}
+
 int hio_close (hio_t* io) {
     if (io->closed) return 0;
     io->closed = 1;

+ 1 - 4
evpp/UdpClient.h

@@ -113,9 +113,7 @@ public:
     // sendto thread-safe
     int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
         if (channel == NULL) return -1;
-        std::lock_guard<std::mutex> locker(sendto_mutex);
-        if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
-        return channel->write(data, size);
+        return hio_sendto(channel->io(), data, size, peeraddr);
     }
     int sendto(Buffer* buf, struct sockaddr* peeraddr = NULL) {
         return sendto(buf->data(), buf->size(), peeraddr);
@@ -152,7 +150,6 @@ public:
     std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
 
 private:
-    std::mutex              sendto_mutex;
     EventLoopPtr            loop_;
 };
 

+ 1 - 4
evpp/UdpServer.h

@@ -89,9 +89,7 @@ public:
     // sendto thread-safe
     int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
         if (channel == NULL) return -1;
-        std::lock_guard<std::mutex> locker(sendto_mutex);
-        if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
-        return channel->write(data, size);
+        return hio_sendto(channel->io(), data, size, peeraddr);
     }
     int sendto(Buffer* buf, struct sockaddr* peeraddr = NULL) {
         return sendto(buf->data(), buf->size(), peeraddr);
@@ -126,7 +124,6 @@ public:
     std::function<void(const TSocketChannelPtr&, Buffer*)>  onWriteComplete;
 
 private:
-    std::mutex              sendto_mutex;
     EventLoopPtr            loop_;
 };
 

+ 1 - 2
examples/nmap/nmap.cpp

@@ -115,8 +115,7 @@ int nmap_discover(Nmap* nmap) {
         memset(&peeraddr, 0, addrlen);
         peeraddr.sin_family = AF_INET;
         peeraddr.sin_addr.s_addr = iter->first;
-        hio_set_peeraddr(io, (struct sockaddr*)&peeraddr, addrlen);
-        hsendto(loop, hio_fd(io), sendbuf, sizeof(sendbuf), NULL);
+        hio_sendto(io, sendbuf, sizeof(sendbuf), (struct sockaddr*)&peeraddr);
         ++ctx.send_cnt;
     }