Kaynağa Gözat

fix: call hio_write in write_cb, iter maybe invalid

ithewei 4 yıl önce
ebeveyn
işleme
a69cd46cdf
2 değiştirilmiş dosya ile 22 ekleme ve 12 silme
  1. 4 3
      event/hevent.h
  2. 18 9
      event/nio.c

+ 4 - 3
event/hevent.h

@@ -184,10 +184,11 @@ struct hio_s {
  * hio lifeline:
  *
  * fd =>
- * hio_get => HV_ALLOC_SIZEOF(io) => hio_init =>
+ * hio_get => HV_ALLOC_SIZEOF(io) => hio_init => hio_ready
  *
- * hio_ready => hio_add => hio_read_cb/hio_write_cb =>
- * hio_close => hio_done => hio_close_cb =>
+ * hio_read  => hio_add(HV_READ) => hio_read_cb
+ * hio_write => hio_add(HV_WRITE) => hio_write_cb
+ * hio_close => hio_done => hio_del(HV_RDWR) => hio_close_cb
  *
  * hloop_stop => hloop_free => hio_free => HV_FREE(io)
  */

+ 18 - 9
event/nio.c

@@ -313,14 +313,14 @@ write:
         return;
     }
     offset_buf_t* pbuf = write_queue_front(&io->write_queue);
-    char* buf = pbuf->base + pbuf->offset;
+    char* base = pbuf->base;
+    char* buf = base + pbuf->offset;
     int len = pbuf->len - pbuf->offset;
     nwrite = __nio_write(io, buf, len);
     // printd("write retval=%d\n", nwrite);
     if (nwrite < 0) {
         err = socket_errno();
         if (err == EAGAIN) {
-            //goto write_done;
             hrecursive_mutex_unlock(&io->write_mutex);
             return;
         } else {
@@ -336,10 +336,14 @@ write:
     io->write_bufsize -= nwrite;
     __write_cb(io, buf, nwrite);
     if (nwrite == len) {
-        HV_FREE(pbuf->base);
+        // NOTE: after write_cb, pbuf maybe invalid.
+        // HV_FREE(pbuf->base);
+        HV_FREE(base);
         write_queue_pop_front(&io->write_queue);
-        // write next
-        goto write;
+        if (!io->closed) {
+            // write continue
+            goto write;
+        }
     }
     hrecursive_mutex_unlock(&io->write_mutex);
     return;
@@ -488,10 +492,10 @@ enqueue:
         }
     }
 write_done:
+    hrecursive_mutex_unlock(&io->write_mutex);
     if (nwrite > 0) {
         __write_cb(io, buf, nwrite);
     }
-    hrecursive_mutex_unlock(&io->write_mutex);
     return nwrite;
 write_error:
 disconnect:
@@ -510,18 +514,24 @@ int hio_close (hio_t* io) {
     if (hv_gettid() != io->loop->tid) {
         return hio_close_async(io);
     }
+
     hrecursive_mutex_lock(&io->write_mutex);
-    if (!write_queue_empty(&io->write_queue) && io->error == 0 && io->close == 0) {
+    if (io->closed) {
         hrecursive_mutex_unlock(&io->write_mutex);
+        return 0;
+    }
+    if (!write_queue_empty(&io->write_queue) && io->error == 0 && io->close == 0) {
         io->close = 1;
+        hrecursive_mutex_unlock(&io->write_mutex);
         hlogw("write_queue not empty, close later.");
         int timeout_ms = io->close_timeout ? io->close_timeout : HIO_DEFAULT_CLOSE_TIMEOUT;
         io->close_timer = htimer_add(io->loop, __close_timeout_cb, timeout_ms, 1);
         io->close_timer->privdata = io;
         return 0;
     }
-
     io->closed = 1;
+    hrecursive_mutex_unlock(&io->write_mutex);
+
     hio_done(io);
     __close_cb(io);
     if (io->ssl) {
@@ -531,7 +541,6 @@ int hio_close (hio_t* io) {
     if (io->io_type & HIO_TYPE_SOCKET) {
         closesocket(io->fd);
     }
-    hrecursive_mutex_unlock(&io->write_mutex);
     return 0;
 }
 #endif