Parcourir la source

eventfd > pipe > socketpair

ithewei il y a 3 ans
Parent
commit
600cbe243a
6 fichiers modifiés avec 92 ajouts et 37 suppressions
  1. 1 0
      CMakeLists.txt
  2. 28 9
      configure
  3. 1 1
      event/hevent.h
  4. 54 27
      event/hloop.c
  5. 4 0
      hconfig.h
  6. 4 0
      hconfig.h.in

+ 1 - 0
CMakeLists.txt

@@ -59,6 +59,7 @@ check_function("pthread_mutex_timedlock" "pthread.h")
 check_function("sem_timedwait" "semaphore.h")
 check_function("pipe" "unistd.h")
 check_function("socketpair" "sys/socket.h")
+check_function("eventfd" "sys/eventfd.h")
 
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/hconfig.h.in ${CMAKE_CURRENT_SOURCE_DIR}/hconfig.h)
 

+ 28 - 9
configure

@@ -160,8 +160,7 @@ cat << END >> hconfig.h
 END
 }
 
-check_header() {
-echo -n "checking for $header... "
+CheckHeaderExists() {
 rm tmp/check 2>/dev/null
 cat << END > tmp/check.c
 #include <$header>
@@ -174,17 +173,16 @@ END
 $CC -o tmp/check tmp/check.c 2>/dev/null
 if [ -x tmp/check ]; then
     value=1
-    echo "yes"
 else
     value=0
-    echo "no"
 fi
-macro=HAVE_$(echo $header | tr a-z./ A-Z__)
-write_define
 }
 
-check_function() {
-echo -n "checking for $function... "
+CheckSymbolExists() {
+CheckHeaderExists
+if [ $value -eq 0 ]; then
+    return;
+fi
 rm tmp/check 2>/dev/null
 cat << END > tmp/check.c
 #include <$header>
@@ -199,9 +197,29 @@ END
 $CC -o tmp/check tmp/check.c 2>/dev/null
 if [ -x tmp/check ]; then
     value=0
-    echo "no"
 else
     value=1
+fi
+}
+
+check_header() {
+echo -n "checking for $header... "
+CheckHeaderExists
+if [ $value -eq 0 ]; then
+    echo "no"
+else
+    echo "yes"
+fi
+macro=HAVE_$(echo $header | tr a-z./ A-Z__)
+write_define
+}
+
+check_function() {
+echo -n "checking for $function... "
+CheckSymbolExists
+if [ $value -eq 0 ]; then
+    echo "no"
+else
     echo "yes"
 fi
 macro=HAVE_$(echo $function | tr a-z A-Z)
@@ -247,6 +265,7 @@ function=pthread_mutex_timedlock && header=pthread.h && check_function
 function=sem_timedwait && header=semaphore.h && check_function
 function=pipe && header=unistd.h && check_function
 function=socketpair && header=sys/socket.h && check_function
+function=eventfd && header=sys/eventfd.h && check_function
 
 # Checks for options
 source config.mk 2>/dev/null

+ 1 - 1
event/hevent.h

@@ -58,7 +58,7 @@ struct hloop_s {
     hbuf_t                      readbuf;
     void*                       iowatcher;
     // custom_events
-    int                         sockpair[2];
+    int                         eventfds[2];
     event_queue                 custom_events;
     hmutex_t                    custom_events_mutex;
 };

+ 54 - 27
event/hloop.c

@@ -10,6 +10,10 @@
 #include "hsocket.h"
 #include "hthread.h"
 
+#if defined(OS_UNIX) && HAVE_EVENTFD
+#include "sys/eventfd.h"
+#endif
+
 #define HLOOP_PAUSE_TIME        10      // ms
 #define HLOOP_MAX_BLOCK_TIME    100     // ms
 #define HLOOP_STAT_TIMEOUT      60000   // ms
@@ -17,8 +21,8 @@
 #define IO_ARRAY_INIT_SIZE              1024
 #define CUSTOM_EVENT_QUEUE_INIT_SIZE    16
 
-#define SOCKPAIR_READ_INDEX     0
-#define SOCKPAIR_WRITE_INDEX    1
+#define EVENTFDS_READ_INDEX     0
+#define EVENTFDS_WRITE_INDEX    1
 
 static void __hidle_del(hidle_t* idle);
 static void __htimer_del(htimer_t* timer);
@@ -181,11 +185,16 @@ static void hloop_stat_timer_cb(htimer_t* timer) {
         loop->nactives, loop->nios, loop->ntimers, loop->nidles);
 }
 
-static void sockpair_read_cb(hio_t* io, void* buf, int readbytes) {
+static void eventfd_read_cb(hio_t* io, void* buf, int readbytes) {
     hloop_t* loop = io->loop;
     hevent_t* pev = NULL;
     hevent_t ev;
-    for (int i = 0; i < readbytes; ++i) {
+    uint64_t count = readbytes;
+#if defined(OS_UNIX) && HAVE_EVENTFD
+    assert(readbytes == sizeof(count));
+    count = *(uint64_t*)buf;
+#endif
+    for (uint64_t i = 0; i < count; ++i) {
         hmutex_lock(&loop->custom_events_mutex);
         if (event_queue_empty(&loop->custom_events)) {
             goto unlock;
@@ -207,29 +216,44 @@ unlock:
     hmutex_unlock(&loop->custom_events_mutex);
 }
 
-static int hloop_create_sockpair(hloop_t* loop) {
-#if defined(OS_UNIX) && HAVE_PIPE
-    if (pipe(loop->sockpair) != 0) {
+static int hloop_create_eventfds(hloop_t* loop) {
+#if defined(OS_UNIX) && HAVE_EVENTFD
+    int efd = eventfd(0, 0);
+    if (efd < 0) {
+        hloge("eventfd create failed!");
+        return -1;
+    }
+    loop->eventfds[0] = loop->eventfds[1] = efd;
+#elif defined(OS_UNIX) && HAVE_PIPE
+    if (pipe(loop->eventfds) != 0) {
         hloge("pipe create failed!");
         return -1;
     }
 #else
-    if (Socketpair(AF_INET, SOCK_STREAM, 0, loop->sockpair) != 0) {
+    if (Socketpair(AF_INET, SOCK_STREAM, 0, loop->eventfds) != 0) {
         hloge("socketpair create failed!");
         return -1;
     }
 #endif
-    hio_t* io = hread(loop, loop->sockpair[SOCKPAIR_READ_INDEX], loop->readbuf.base, loop->readbuf.len, sockpair_read_cb);
+    hio_t* io = hread(loop, loop->eventfds[EVENTFDS_READ_INDEX], loop->readbuf.base, loop->readbuf.len, eventfd_read_cb);
     io->priority = HEVENT_HIGH_PRIORITY;
-    // NOTE: Avoid duplication closesocket in hio_cleanup
-    loop->sockpair[SOCKPAIR_READ_INDEX] = -1;
     ++loop->intern_nevents;
     return 0;
 }
 
-static void hloop_destroy_sockpair(hloop_t* loop) {
-    SAFE_CLOSESOCKET(loop->sockpair[SOCKPAIR_READ_INDEX]);
-    SAFE_CLOSESOCKET(loop->sockpair[SOCKPAIR_WRITE_INDEX]);
+static void hloop_destroy_eventfds(hloop_t* loop) {
+#if defined(OS_UNIX) && HAVE_EVENTFD
+    // NOTE: eventfd has only one fd
+    SAFE_CLOSE(loop->eventfds[0]);
+#elif defined(OS_UNIX) && HAVE_PIPE
+    SAFE_CLOSE(loop->eventfds[0]);
+    SAFE_CLOSE(loop->eventfds[1]);
+#else
+    // NOTE: Avoid duplication closesocket in hio_cleanup
+    // SAFE_CLOSESOCKET(loop->eventfds[EVENTFDS_READ_INDEX]);
+    SAFE_CLOSESOCKET(loop->eventfds[EVENTFDS_WRITE_INDEX]);
+#endif
+    loop->eventfds[0] = loop->eventfds[1] = -1;
 }
 
 void hloop_post_event(hloop_t* loop, hevent_t* ev) {
@@ -243,20 +267,23 @@ void hloop_post_event(hloop_t* loop, hevent_t* ev) {
         ev->event_id = hloop_next_event_id();
     }
 
-    int nsend = 0;
+    int nwrite = 0;
+    uint64_t count = 1;
     hmutex_lock(&loop->custom_events_mutex);
-    if (loop->sockpair[SOCKPAIR_WRITE_INDEX] == -1) {
-        if (hloop_create_sockpair(loop) != 0) {
+    if (loop->eventfds[EVENTFDS_WRITE_INDEX] == -1) {
+        if (hloop_create_eventfds(loop) != 0) {
             goto unlock;
         }
     }
-#if defined(OS_UNIX) && HAVE_PIPE
-    nsend = write(loop->sockpair[SOCKPAIR_WRITE_INDEX], "e", 1);
+#if defined(OS_UNIX) && HAVE_EVENTFD
+    nwrite = write(loop->eventfds[EVENTFDS_WRITE_INDEX], &count, sizeof(count));
+#elif defined(OS_UNIX) && HAVE_PIPE
+    nwrite = write(loop->eventfds[EVENTFDS_WRITE_INDEX], "e", 1);
 #else
-    nsend =  send(loop->sockpair[SOCKPAIR_WRITE_INDEX], "e", 1, 0);
+    nwrite =  send(loop->eventfds[EVENTFDS_WRITE_INDEX], "e", 1, 0);
 #endif
-    if (nsend != 1) {
-        hloge("send failed!");
+    if (nwrite <= 0) {
+        hloge("hloop_post_event failed!");
         goto unlock;
     }
     event_queue_push_back(&loop->custom_events, ev);
@@ -296,8 +323,8 @@ static void hloop_init(hloop_t* loop) {
     // custom_events
     hmutex_init(&loop->custom_events_mutex);
     event_queue_init(&loop->custom_events, CUSTOM_EVENT_QUEUE_INIT_SIZE);
-    // NOTE: hloop_create_sockpair when hloop_post_event or hloop_run
-    loop->sockpair[0] = loop->sockpair[1] = -1;
+    // NOTE: hloop_create_eventfds when hloop_post_event or hloop_run
+    loop->eventfds[0] = loop->eventfds[1] = -1;
 
     // NOTE: init start_time here, because htimer_add use it.
     loop->start_ms = gettimeofday_ms();
@@ -354,7 +381,7 @@ static void hloop_cleanup(hloop_t* loop) {
 
     // custom_events
     hmutex_lock(&loop->custom_events_mutex);
-    hloop_destroy_sockpair(loop);
+    hloop_destroy_eventfds(loop);
     event_queue_cleanup(&loop->custom_events);
     hmutex_unlock(&loop->custom_events_mutex);
     hmutex_destroy(&loop->custom_events_mutex);
@@ -387,8 +414,8 @@ int hloop_run(hloop_t* loop) {
 
     if (loop->intern_nevents == 0) {
         hmutex_lock(&loop->custom_events_mutex);
-        if (loop->sockpair[SOCKPAIR_WRITE_INDEX] == -1) {
-            hloop_create_sockpair(loop);
+        if (loop->eventfds[EVENTFDS_WRITE_INDEX] == -1) {
+            hloop_create_eventfds(loop);
         }
         hmutex_unlock(&loop->custom_events_mutex);
 

+ 4 - 0
hconfig.h

@@ -73,6 +73,10 @@
 #define HAVE_SOCKETPAIR 1
 #endif
 
+#ifndef HAVE_EVENTFD
+#define HAVE_EVENTFD 1
+#endif
+
 /* #undef WITH_OPENSSL */
 /* #undef WITH_GNUTLS */
 /* #undef WITH_MBEDTLS */

+ 4 - 0
hconfig.h.in

@@ -73,6 +73,10 @@
 #define HAVE_SOCKETPAIR @HAVE_SOCKETPAIR@
 #endif
 
+#ifndef HAVE_EVENTFD
+#define HAVE_EVENTFD @HAVE_EVENTFD@
+#endif
+
 #cmakedefine WITH_OPENSSL   1
 #cmakedefine WITH_GNUTLS    1
 #cmakedefine WITH_MBEDTLS   1