Przeglądaj źródła

hloop_post_event lock for thread-safe

hewei 6 lat temu
rodzic
commit
e56d656326
3 zmienionych plików z 24 dodań i 6 usunięć
  1. 2 0
      event/hevent.h
  2. 21 6
      event/hloop.c
  3. 1 0
      event/hloop.h

+ 2 - 0
event/hevent.h

@@ -8,6 +8,7 @@
 
 #include "hloop.h"
 #include "hbuf.h"
+#include "hmutex.h"
 
 typedef enum {
     HLOOP_STATUS_STOP,
@@ -48,6 +49,7 @@ struct hloop_s {
     int                         sockpair[2];
     char                        readbuf[4];
     event_queue                 custom_events;
+    hmutex_t                    custom_events_mutex;
 };
 
 struct hidle_s {

+ 21 - 6
event/hloop.c

@@ -172,6 +172,7 @@ static void hloop_init(hloop_t* loop) {
     // custom_events: init when hloop_post_event
     //event_queue_init(&loop->custom_events, 4);
     loop->sockpair[0] = loop->sockpair[1] = -1;
+    hmutex_init(&loop->custom_events_mutex);
     // NOTE: init start_time here, because htimer_add use it.
     loop->start_ms = timestamp_ms();
     loop->start_hrtime = loop->cur_hrtime = gethrtime();
@@ -217,12 +218,15 @@ static void hloop_cleanup(hloop_t* loop) {
     // iowatcher
     iowatcher_cleanup(loop);
     // custom_events
+    hmutex_lock(&loop->custom_events_mutex);
     if (loop->sockpair[0] != -1 && loop->sockpair[1] != -1) {
         closesocket(loop->sockpair[0]);
         closesocket(loop->sockpair[1]);
         loop->sockpair[0] = loop->sockpair[1] = -1;
     }
     event_queue_cleanup(&loop->custom_events);
+    hmutex_unlock(&loop->custom_events_mutex);
+    hmutex_destroy(&loop->custom_events_mutex);
 }
 
 hloop_t* hloop_new(int flags) {
@@ -716,27 +720,37 @@ hio_t* create_udp_client(hloop_t* loop, const char* host, int port) {
 
 static void sockpair_read_cb(hio_t* io, void* buf, int readbytes) {
     hloop_t* loop = io->loop;
+    hevent_t* pev = NULL;
+    hevent_t ev;
     for (int i = 0; i < readbytes; ++i) {
+        hmutex_lock(&loop->custom_events_mutex);
         if (event_queue_empty(&loop->custom_events)) {
-            return;
+            goto unlock;
         }
-        hevent_t* pev = event_queue_front(&loop->custom_events);
+        pev = event_queue_front(&loop->custom_events);
         if (pev == NULL) {
-            return;
+            goto unlock;
         }
-        hevent_t ev = *pev;
+        ev = *pev;
         event_queue_pop_front(&loop->custom_events);
+        // NOTE: unlock before cb, avoid deadlock if hloop_post_event called in cb.
+        hmutex_unlock(&loop->custom_events_mutex);
         if (ev.cb) {
             ev.cb(&ev);
         }
     }
+    return;
+unlock:
+    hmutex_unlock(&loop->custom_events_mutex);
 }
 
 void hloop_post_event(hloop_t* loop, hevent_t* ev) {
+    char buf = '1';
+    hmutex_lock(&loop->custom_events_mutex);
     if (loop->sockpair[0] <= 0 && loop->sockpair[1] <= 0) {
         if (Socketpair(AF_INET, SOCK_STREAM, 0, loop->sockpair) != 0) {
             hloge("socketpair error");
-            return;
+            goto unlock;
         }
         hread(loop, loop->sockpair[1], loop->readbuf, sizeof(loop->readbuf), sockpair_read_cb);
     }
@@ -753,6 +767,7 @@ void hloop_post_event(hloop_t* loop, hevent_t* ev) {
         ev->event_id = ++loop->event_counter;
     }
     event_queue_push_back(&loop->custom_events, ev);
-    char buf = '1';
     hwrite(loop, loop->sockpair[0], &buf, 1, NULL);
+unlock:
+    hmutex_unlock(&loop->custom_events_mutex);
 }

+ 1 - 0
event/hloop.h

@@ -117,6 +117,7 @@ void* hloop_userdata(hloop_t* loop);
  * ev.userdata = userdata;
  * hloop_post_event(loop, &ev);
  */
+// NOTE: hloop_post_event is thread-safe
 void hloop_post_event(hloop_t* loop, hevent_t* ev);
 
 // idle