Просмотр исходного кода

feat: support nonblocking sendto multiple peeraddr (#707)

ithewei 8 месяцев назад
Родитель
Сommit
5001c07f80
3 измененных файлов с 24 добавлено и 17 удалено
  1. 1 6
      event/hevent.c
  2. 21 9
      event/nio.c
  3. 2 2
      event/overlapio.c

+ 1 - 6
event/hevent.c

@@ -43,12 +43,7 @@ static void fill_io_type(hio_t* io) {
 }
 }
 
 
 static void hio_socket_init(hio_t* io) {
 static void hio_socket_init(hio_t* io) {
-    if ((io->io_type & HIO_TYPE_SOCK_DGRAM) || (io->io_type & HIO_TYPE_SOCK_RAW)) {
-        // NOTE: sendto multiple peeraddr cannot use io->write_queue
-        blocking(io->fd);
-    } else {
-        nonblocking(io->fd);
-    }
+    nonblocking(io->fd);
     // fill io->localaddr io->peeraddr
     // fill io->localaddr io->peeraddr
     if (io->localaddr == NULL) {
     if (io->localaddr == NULL) {
         HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
         HV_ALLOC(io->localaddr, sizeof(sockaddr_u));

+ 21 - 9
event/nio.c

@@ -372,7 +372,11 @@ write:
     char* base = pbuf->base;
     char* base = pbuf->base;
     char* buf = base + pbuf->offset;
     char* buf = base + pbuf->offset;
     int len = pbuf->len - pbuf->offset;
     int len = pbuf->len - pbuf->offset;
-    nwrite = __nio_write(io, buf, len, NULL);
+    struct sockaddr* addr = NULL;
+    if (io->io_type & (HIO_TYPE_SOCK_DGRAM | HIO_TYPE_SOCK_RAW)) {
+        addr = (struct sockaddr*)base;
+    }
+    nwrite = __nio_write(io, buf, len, addr);
     // printd("write retval=%d\n", nwrite);
     // printd("write retval=%d\n", nwrite);
     if (nwrite < 0) {
     if (nwrite < 0) {
         err = socket_errno();
         err = socket_errno();
@@ -526,26 +530,34 @@ enqueue:
         hio_add(io, hio_handle_events, HV_WRITE);
         hio_add(io, hio_handle_events, HV_WRITE);
     }
     }
     if (nwrite < len) {
     if (nwrite < len) {
-        if (io->write_bufsize + len - nwrite > io->max_write_bufsize) {
+        size_t unwritten_len = len - nwrite;
+        if (io->write_bufsize + unwritten_len > io->max_write_bufsize) {
             hloge("write bufsize > %u, close it!", io->max_write_bufsize);
             hloge("write bufsize > %u, close it!", io->max_write_bufsize);
             io->error = ERR_OVER_LIMIT;
             io->error = ERR_OVER_LIMIT;
             goto write_error;
             goto write_error;
         }
         }
+        size_t addrlen = 0;
+        if ((io->io_type & (HIO_TYPE_SOCK_DGRAM | HIO_TYPE_SOCK_RAW)) && addr) {
+            addrlen = SOCKADDR_LEN(addr);
+        }
         offset_buf_t remain;
         offset_buf_t remain;
-        remain.len = len - nwrite;
-        remain.offset = 0;
+        remain.offset = addrlen;
+        remain.len = addrlen + unwritten_len;
         // NOTE: free in nio_write
         // NOTE: free in nio_write
         HV_ALLOC(remain.base, remain.len);
         HV_ALLOC(remain.base, remain.len);
-        memcpy(remain.base, ((char*)buf) + nwrite, remain.len);
+        if (addr && addrlen > 0) {
+            memcpy(remain.base, addr, addrlen);
+        }
+        memcpy(remain.base + remain.offset, ((char*)buf) + nwrite, unwritten_len);
         if (io->write_queue.maxsize == 0) {
         if (io->write_queue.maxsize == 0) {
             write_queue_init(&io->write_queue, 4);
             write_queue_init(&io->write_queue, 4);
         }
         }
         write_queue_push_back(&io->write_queue, &remain);
         write_queue_push_back(&io->write_queue, &remain);
-        io->write_bufsize += remain.len;
+        io->write_bufsize += unwritten_len;
         if (io->write_bufsize > WRITE_BUFSIZE_HIGH_WATER) {
         if (io->write_bufsize > WRITE_BUFSIZE_HIGH_WATER) {
             hlogw("write len=%u enqueue %u, bufsize=%u over high water %u",
             hlogw("write len=%u enqueue %u, bufsize=%u over high water %u",
                 (unsigned int)len,
                 (unsigned int)len,
-                (unsigned int)(remain.len - remain.offset),
+                (unsigned int)unwritten_len,
                 (unsigned int)io->write_bufsize,
                 (unsigned int)io->write_bufsize,
                 (unsigned int)WRITE_BUFSIZE_HIGH_WATER);
                 (unsigned int)WRITE_BUFSIZE_HIGH_WATER);
         }
         }
@@ -571,11 +583,11 @@ disconnect:
 }
 }
 
 
 int hio_write (hio_t* io, const void* buf, size_t len) {
 int hio_write (hio_t* io, const void* buf, size_t len) {
-    return hio_write4(io, buf, len, NULL);
+    return hio_write4(io, buf, len, io->peeraddr);
 }
 }
 
 
 int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
 int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
-    return hio_write4(io, buf, len, addr);
+    return hio_write4(io, buf, len, addr ? addr : io->peeraddr);
 }
 }
 
 
 int hio_close (hio_t* io) {
 int hio_close (hio_t* io) {

+ 2 - 2
event/overlapio.c

@@ -380,11 +380,11 @@ disconnect:
 }
 }
 
 
 int hio_write (hio_t* io, const void* buf, size_t len) {
 int hio_write (hio_t* io, const void* buf, size_t len) {
-    return hio_write4(io, buf, len, NULL);
+    return hio_write4(io, buf, len, io->peeraddr);
 }
 }
 
 
 int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
 int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
-    return hio_write4(io, buf, len, addr);
+    return hio_write4(io, buf, len, addr ? addr : io->peeraddr);
 }
 }
 
 
 int hio_close (hio_t* io) {
 int hio_close (hio_t* io) {