فهرست منبع

hio_t add write_mutex

ithewei 5 سال پیش
والد
کامیت
d39bbd3ac8
4فایلهای تغییر یافته به همراه22 افزوده شده و 1 حذف شده
  1. 1 0
      event/hevent.h
  2. 5 0
      event/hloop.c
  3. 2 1
      event/hloop.h
  4. 14 0
      event/nio.c

+ 1 - 0
event/hevent.h

@@ -107,6 +107,7 @@ struct hio_s {
     struct sockaddr*    peeraddr;
     hbuf_t              readbuf;        // for hread
     struct write_queue  write_queue;    // for hwrite
+    hrecursive_mutex_t  write_mutex;    // lock write and write_queue
     // callbacks
     hread_cb    read_cb;
     hwrite_cb   write_cb;

+ 5 - 0
event/hloop.c

@@ -633,6 +633,8 @@ void hio_init(hio_t* io) {
 
     // write_queue init when hwrite try_write failed
     // write_queue_init(&io->write_queue, 4);
+
+    hrecursive_mutex_init(&io->write_mutex);
 }
 
 void hio_ready(hio_t* io) {
@@ -683,12 +685,14 @@ void hio_done(hio_t* io) {
     hio_del(io, HV_RDWR);
 
     offset_buf_t* pbuf = NULL;
+    hrecursive_mutex_lock(&io->write_mutex);
     while (!write_queue_empty(&io->write_queue)) {
         pbuf = write_queue_front(&io->write_queue);
         HV_FREE(pbuf->base);
         write_queue_pop_front(&io->write_queue);
     }
     write_queue_cleanup(&io->write_queue);
+    hrecursive_mutex_unlock(&io->write_mutex);
 }
 
 void hio_free(hio_t* io) {
@@ -697,6 +701,7 @@ void hio_free(hio_t* io) {
     hio_done(io);
     // NOTE: call hio_close to call hclose_cb
     hio_close(io);
+    hrecursive_mutex_destroy(&io->write_mutex);
     HV_FREE(io->localaddr);
     HV_FREE(io->peeraddr);
     HV_FREE(io);

+ 2 - 1
event/hloop.h

@@ -146,7 +146,7 @@ HV_EXPORT void* hloop_userdata(hloop_t* loop);
  * ev.userdata = userdata;
  * hloop_post_event(loop, &ev);
  */
-// NOTE: hloop_post_event is thread-safe
+// NOTE: hloop_post_event is thread-safe, used to post event from other thread to loop thread.
 HV_EXPORT void hloop_post_event(hloop_t* loop, hevent_t* ev);
 
 // idle
@@ -250,6 +250,7 @@ HV_EXPORT int hio_connect(hio_t* io);
 HV_EXPORT int hio_read   (hio_t* io);
 #define hio_read_start(io) hio_read(io)
 #define hio_read_stop(io)  hio_del(io, HV_READ)
+// NOTE: hio_write is thread-safe, locked by recursive_mutex, allow to be called by other threads.
 // hio_try_write => hio_add(io, HV_WRITE) => write => hwrite_cb
 HV_EXPORT int hio_write  (hio_t* io, const void* buf, size_t len);
 // hio_del(io, HV_RDWR) => close => hclose_cb

+ 14 - 0
event/nio.c

@@ -388,8 +388,10 @@ disconnect:
 static void nio_write(hio_t* io) {
     //printd("nio_write fd=%d\n", io->fd);
     int nwrite = 0;
+    hrecursive_mutex_lock(&io->write_mutex);
 write:
     if (write_queue_empty(&io->write_queue)) {
+        hrecursive_mutex_unlock(&io->write_mutex);
         if (io->close) {
             io->close = 0;
             hio_close(io);
@@ -404,6 +406,7 @@ write:
     if (nwrite < 0) {
         if (socket_errno() == EAGAIN) {
             //goto write_done;
+            hrecursive_mutex_unlock(&io->write_mutex);
             return;
         }
         else {
@@ -423,9 +426,11 @@ write:
         // write next
         goto write;
     }
+    hrecursive_mutex_unlock(&io->write_mutex);
     return;
 write_error:
 disconnect:
+    hrecursive_mutex_unlock(&io->write_mutex);
     hio_close(io);
 }
 
@@ -441,10 +446,12 @@ static void hio_handle_events(hio_t* io) {
 
     if ((io->events & HV_WRITE) && (io->revents & HV_WRITE)) {
         // NOTE: del HV_WRITE, if write_queue empty
+        hrecursive_mutex_lock(&io->write_mutex);
         if (write_queue_empty(&io->write_queue)) {
             iowatcher_del_event(io->loop, io->fd, HV_WRITE);
             io->events &= ~HV_WRITE;
         }
+        hrecursive_mutex_unlock(&io->write_mutex);
         if (io->connect) {
             // NOTE: connect just do once
             // ONESHOT
@@ -503,6 +510,7 @@ int hio_write (hio_t* io, const void* buf, size_t len) {
         return -1;
     }
     int nwrite = 0;
+    hrecursive_mutex_lock(&io->write_mutex);
     if (write_queue_empty(&io->write_queue)) {
 try_write:
         nwrite = __nio_write(io, buf, len);
@@ -525,6 +533,7 @@ try_write:
         __write_cb(io, buf, nwrite);
         if (nwrite == len) {
             //goto write_done;
+            hrecursive_mutex_unlock(&io->write_mutex);
             return nwrite;
         }
 enqueue:
@@ -542,16 +551,20 @@ enqueue:
         }
         write_queue_push_back(&io->write_queue, &rest);
     }
+    hrecursive_mutex_unlock(&io->write_mutex);
     return nwrite;
 write_error:
 disconnect:
+    hrecursive_mutex_unlock(&io->write_mutex);
     hio_close(io);
     return nwrite;
 }
 
 int hio_close (hio_t* io) {
     if (io->closed) return 0;
+    hrecursive_mutex_lock(&io->write_mutex);
     if (!write_queue_empty(&io->write_queue) && io->error == 0 && io->close == 0) {
+        hrecursive_mutex_unlock(&io->write_mutex);
         io->close = 1;
         hlogw("write_queue not empty, close later.");
         int timeout_ms = io->close_timeout ? io->close_timeout : HIO_DEFAULT_CLOSE_TIMEOUT;
@@ -559,6 +572,7 @@ int hio_close (hio_t* io) {
         io->close_timer->privdata = io;
         return 0;
     }
+    hrecursive_mutex_unlock(&io->write_mutex);
 
     io->closed = 1;
     hio_done(io);