Sfoglia il codice sorgente

hio_close thread-safe

ithewei 5 anni fa
parent
commit
011ecc207c
5 ha cambiato i file con 45 aggiunte e 0 eliminazioni
  1. 9 0
      event/hevent.c
  2. 2 0
      event/hevent.h
  3. 11 0
      event/hloop.c
  4. 5 0
      event/hloop.h
  5. 18 0
      event/nio.c

+ 9 - 0
event/hevent.c

@@ -7,6 +7,15 @@ uint64_t hloop_next_event_id() {
     return ++s_id;
 }
 
+uint32_t hio_next_id() {
+    static hatomic_t s_id = HATOMIC_VAR_INIT(0);
+    return ++s_id;
+}
+
+uint32_t hio_id (hio_t* io) {
+    return io->id;
+}
+
 int hio_fd(hio_t* io) {
     return io->fd;
 }

+ 2 - 0
event/hevent.h

@@ -98,6 +98,7 @@ struct hio_s {
     unsigned    sendto      :1;
     unsigned    close       :1;
 // public:
+    uint32_t    id; // fd cannot be used as unique identifier, so we provide an id
     int         fd;
     hio_type_e  io_type;
     int         error;
@@ -141,6 +142,7 @@ void hio_init(hio_t* io);
 void hio_ready(hio_t* io);
 void hio_done(hio_t* io);
 void hio_free(hio_t* io);
+uint32_t hio_next_id();
 
 #define EVENT_ENTRY(p)          container_of(p, hevent_t, pending_node)
 #define IDLE_ENTRY(p)           container_of(p, hidle_t,  node)

+ 11 - 0
event/hloop.c

@@ -647,6 +647,7 @@ void hio_ready(hio_t* io) {
     io->recvfrom = io->sendto = 0;
     io->close = 0;
     // public:
+    io->id = hio_next_id();
     io->io_type = HIO_TYPE_UNKNOWN;
     io->error = 0;
     io->events = io->revents = 0;
@@ -707,6 +708,16 @@ void hio_free(hio_t* io) {
     HV_FREE(io);
 }
 
+bool hio_is_opened(hio_t* io) {
+    if (io == NULL) return false;
+    return io->ready == 1 && io->closed == 0;
+}
+
+bool hio_is_closed(hio_t* io) {
+    if (io == NULL) return true;
+    return io->ready == 0 && io->closed == 1;
+}
+
 hio_t* hio_get(hloop_t* loop, int fd) {
     if (fd >= loop->ios.maxsize) {
         int newsize = ceil2e(fd);

+ 5 - 0
event/hloop.h

@@ -203,6 +203,8 @@ HV_EXPORT int    hio_add(hio_t* io, hio_cb cb, int events DEFAULT(HV_READ));
 HV_EXPORT int    hio_del(hio_t* io, int events DEFAULT(HV_RDWR));
 
 // hio_t fields
+// NOTE: fd cannot be used as unique identifier, so we provide an id.
+HV_EXPORT uint32_t hio_id (hio_t* io);
 HV_EXPORT int hio_fd      (hio_t* io);
 HV_EXPORT int hio_error   (hio_t* io);
 HV_EXPORT int hio_events  (hio_t* io);
@@ -210,6 +212,8 @@ HV_EXPORT int hio_revents (hio_t* io);
 HV_EXPORT hio_type_e       hio_type     (hio_t* io);
 HV_EXPORT struct sockaddr* hio_localaddr(hio_t* io);
 HV_EXPORT struct sockaddr* hio_peeraddr (hio_t* io);
+HV_EXPORT bool hio_is_opened(hio_t* io);
+HV_EXPORT bool hio_is_closed(hio_t* io);
 
 // set callbacks
 HV_EXPORT void hio_setcb_accept   (hio_t* io, haccept_cb  accept_cb);
@@ -253,6 +257,7 @@ HV_EXPORT int hio_read   (hio_t* io);
 // NOTE: hio_write is thread-safe, locked by recursive_mutex, allow to be called by other threads.
 // hio_try_write => hio_add(io, HV_WRITE) => write => hwrite_cb
 HV_EXPORT int hio_write  (hio_t* io, const void* buf, size_t len);
+// NOTE: hio_close is thread-safe, if called by other thread, hloop_post_event(hio_close_event).
 // hio_del(io, HV_RDWR) => close => hclose_cb
 HV_EXPORT int hio_close  (hio_t* io);
 

+ 18 - 0
event/nio.c

@@ -4,6 +4,7 @@
 #include "hsocket.h"
 #include "hssl.h"
 #include "hlog.h"
+#include "hthread.h"
 
 static void __connect_timeout_cb(htimer_t* timer) {
     hio_t* io = (hio_t*)timer->privdata;
@@ -560,8 +561,25 @@ disconnect:
     return nwrite;
 }
 
+static void hio_close_event_cb(hevent_t* ev) {
+    hio_t* io = (hio_t*)ev->userdata;
+    uint32_t id = (uintptr_t)ev->privdata;
+    if (io->id == id) {
+        hio_close((hio_t*)ev->userdata);
+    }
+}
+
 int hio_close (hio_t* io) {
     if (io->closed) return 0;
+    if (hv_gettid() != io->loop->tid) {
+        hevent_t ev;
+        memset(&ev, 0, sizeof(ev));
+        ev.cb = hio_close_event_cb;
+        ev.userdata = io;
+        ev.privdata = (void*)(uintptr_t)io->id;
+        hloop_post_event(io->loop, &ev);
+        return 0;
+    }
     hrecursive_mutex_lock(&io->write_mutex);
     if (!write_queue_empty(&io->write_queue) && io->error == 0 && io->close == 0) {
         hrecursive_mutex_unlock(&io->write_mutex);