Sfoglia il codice sorgente

hloop_wakeup when hloop_stop called by other thread

hewei.it 5 anni fa
parent
commit
c93cab86db
5 ha cambiato i file con 161 aggiunte e 91 eliminazioni
  1. 2 0
      event/hevent.h
  2. 100 69
      event/hloop.c
  3. 8 0
      event/hloop.h
  4. 10 0
      examples/http_server_test.cpp
  5. 41 22
      http/server/HttpServer.cpp

+ 2 - 0
event/hevent.h

@@ -29,6 +29,8 @@ struct hloop_s {
     uint64_t    end_hrtime;
     uint64_t    cur_hrtime;
     uint64_t    loop_cnt;
+    long        pid;
+    long        tid;
     void*       userdata;
 //private:
     // events

+ 100 - 69
event/hloop.c

@@ -8,6 +8,7 @@
 #include "hmath.h"
 #include "htime.h"
 #include "hsocket.h"
+#include "hthread.h"
 
 #define PAUSE_TIME              10          // ms
 #define MAX_BLOCK_TIME          1000        // ms
@@ -15,6 +16,9 @@
 #define IO_ARRAY_INIT_SIZE              1024
 #define CUSTOM_EVENT_QUEUE_INIT_SIZE    16
 
+#define SOCKPAIR_WRITE_INDEX    0
+#define SOCKPAIR_READ_INDEX     1
+
 static void __hidle_del(hidle_t* idle);
 static void __htimer_del(htimer_t* timer);
 
@@ -155,6 +159,56 @@ process_timers:
     return ncbs;
 }
 
+static void sockpair_read_cb(hio_t* io, void* buf, int readbytes) {
+    hloop_t* loop = io->loop;
+    hevent_t* pev = NULL;
+    hevent_t ev;
+    for (int i = 0; i < readbytes; ++i) {
+        hmutex_lock(&loop->custom_events_mutex);
+        if (event_queue_empty(&loop->custom_events)) {
+            goto unlock;
+        }
+        pev = event_queue_front(&loop->custom_events);
+        if (pev == NULL) {
+            goto unlock;
+        }
+        ev = *pev;
+        event_queue_pop_front(&loop->custom_events);
+        // NOTE: unlock before cb, avoid deadlock if hloop_post_event called in cb.
+        hmutex_unlock(&loop->custom_events_mutex);
+        if (ev.cb) {
+            ev.cb(&ev);
+        }
+    }
+    return;
+unlock:
+    hmutex_unlock(&loop->custom_events_mutex);
+}
+
+void hloop_post_event(hloop_t* loop, hevent_t* ev) {
+    char buf = '1';
+
+    if (loop->sockpair[0] == -1 || loop->sockpair[1] == -1) {
+        hlogw("socketpair not created!");
+        return;
+    }
+
+    if (ev->loop == NULL) {
+        ev->loop = loop;
+    }
+    if (ev->event_type == 0) {
+        ev->event_type = HEVENT_TYPE_CUSTOM;
+    }
+
+    hmutex_lock(&loop->custom_events_mutex);
+    if (ev->event_id == 0) {
+        ev->event_id = ++loop->event_counter;
+    }
+    hwrite(loop, loop->sockpair[SOCKPAIR_WRITE_INDEX], &buf, 1, NULL);
+    event_queue_push_back(&loop->custom_events, ev);
+    hmutex_unlock(&loop->custom_events_mutex);
+}
+
 static void hloop_init(hloop_t* loop) {
     loop->status = HLOOP_STATUS_STOP;
 
@@ -164,20 +218,26 @@ static void hloop_init(hloop_t* loop) {
     // timers
     heap_init(&loop->timers, timers_compare);
 
-    // ios: init when hio_get
-    // io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
+    // ios
+    io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
 
-    // readbuf: alloc when hio_set_readbuf
-    // loop->readbuf.len = HLOOP_READ_BUFSIZE;
-    // HV_ALLOC(loop->readbuf.base, loop->readbuf.len);
+    // readbuf
+    loop->readbuf.len = HLOOP_READ_BUFSIZE;
+    HV_ALLOC(loop->readbuf.base, loop->readbuf.len);
 
-    // iowatcher: init when iowatcher_add_event
-    // iowatcher_init(loop);
+    // iowatcher
+    iowatcher_init(loop);
 
-    // custom_events: init when hloop_post_event
-    // event_queue_init(&loop->custom_events, 4);
-    loop->sockpair[0] = loop->sockpair[1] = -1;
+    // custom_events
     hmutex_init(&loop->custom_events_mutex);
+    event_queue_init(&loop->custom_events, CUSTOM_EVENT_QUEUE_INIT_SIZE);
+    loop->sockpair[0] = loop->sockpair[1] = -1;
+    if (Socketpair(AF_INET, SOCK_STREAM, 0, loop->sockpair) != 0) {
+        hloge("socketpair create failed!");
+    }
+    if (loop->sockpair[0] != -1 && loop->sockpair[1] != -1) {
+        hread(loop, loop->sockpair[SOCKPAIR_READ_INDEX], loop->readbuf.base, loop->readbuf.len, sockpair_read_cb);
+    }
 
     // NOTE: init start_time here, because htimer_add use it.
     loop->start_ms = gettimeofday_ms();
@@ -261,6 +321,8 @@ void hloop_free(hloop_t** pp) {
 }
 
 int hloop_run(hloop_t* loop) {
+    loop->pid = hv_getpid();
+    loop->tid = hv_gettid();
     loop->status = HLOOP_STATUS_RUNNING;
     while (loop->status != HLOOP_STATUS_STOP) {
         if (loop->status == HLOOP_STATUS_PAUSE) {
@@ -269,7 +331,8 @@ int hloop_run(hloop_t* loop) {
             continue;
         }
         ++loop->loop_cnt;
-        if (loop->nactives == 0 && loop->flags & HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS) {
+        // NOTE: socketpair have one HV_READ
+        if (loop->nactives < 2 && loop->flags & HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS) {
             break;
         }
         hloop_process_events(loop);
@@ -286,8 +349,26 @@ int hloop_run(hloop_t* loop) {
     return 0;
 }
 
+int hloop_wakeup(hloop_t* loop) {
+    hevent_t ev;
+    memset(&ev, 0, sizeof(ev));
+    hloop_post_event(loop, &ev);
+    return 0;
+}
+
+static void hloop_stop_event_cb(hevent_t* ev) {
+    ev->loop->status = HLOOP_STATUS_STOP;
+}
+
 int hloop_stop(hloop_t* loop) {
     loop->status = HLOOP_STATUS_STOP;
+    if (hv_gettid() != loop->tid) {
+        hevent_t ev;
+        memset(&ev, 0, sizeof(ev));
+        ev.priority = HEVENT_HIGHEST_PRIORITY;
+        ev.cb = hloop_stop_event_cb;
+        hloop_post_event(loop, &ev);
+    }
     return 0;
 }
 
@@ -325,6 +406,14 @@ uint64_t hloop_now_hrtime(hloop_t* loop) {
     return loop->start_ms * 1000 + (loop->cur_hrtime - loop->start_hrtime);
 }
 
+long hloop_pid(hloop_t* loop) {
+    return loop->pid;
+}
+
+long hloop_tid(hloop_t* loop) {
+    return loop->tid;
+}
+
 void  hloop_set_userdata(hloop_t* loop, void* userdata) {
     loop->userdata = userdata;
 }
@@ -578,10 +667,6 @@ void hio_free(hio_t* io) {
 }
 
 hio_t* hio_get(hloop_t* loop, int fd) {
-    if (loop->ios.maxsize == 0) {
-        io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
-    }
-
     if (fd >= loop->ios.maxsize) {
         int newsize = ceil2e(fd);
         io_array_resize(&loop->ios, newsize > fd ? newsize : 2*fd);
@@ -794,57 +879,3 @@ hio_t* hloop_create_udp_client(hloop_t* loop, const char* host, int port) {
     hio_set_peeraddr(io, &peeraddr.sa, sockaddr_len(&peeraddr));
     return io;
 }
-
-static void sockpair_read_cb(hio_t* io, void* buf, int readbytes) {
-    hloop_t* loop = io->loop;
-    hevent_t* pev = NULL;
-    hevent_t ev;
-    for (int i = 0; i < readbytes; ++i) {
-        hmutex_lock(&loop->custom_events_mutex);
-        if (event_queue_empty(&loop->custom_events)) {
-            goto unlock;
-        }
-        pev = event_queue_front(&loop->custom_events);
-        if (pev == NULL) {
-            goto unlock;
-        }
-        ev = *pev;
-        event_queue_pop_front(&loop->custom_events);
-        // NOTE: unlock before cb, avoid deadlock if hloop_post_event called in cb.
-        hmutex_unlock(&loop->custom_events_mutex);
-        if (ev.cb) {
-            ev.cb(&ev);
-        }
-    }
-    return;
-unlock:
-    hmutex_unlock(&loop->custom_events_mutex);
-}
-
-void hloop_post_event(hloop_t* loop, hevent_t* ev) {
-    char buf = '1';
-    hmutex_lock(&loop->custom_events_mutex);
-    if (loop->sockpair[0] <= 0 && loop->sockpair[1] <= 0) {
-        if (Socketpair(AF_INET, SOCK_STREAM, 0, loop->sockpair) != 0) {
-            hloge("socketpair error");
-            goto unlock;
-        }
-        hread(loop, loop->sockpair[1], loop->readbuf.base, loop->readbuf.len, sockpair_read_cb);
-    }
-    if (loop->custom_events.maxsize == 0) {
-        event_queue_init(&loop->custom_events, CUSTOM_EVENT_QUEUE_INIT_SIZE);
-    }
-    if (ev->loop == NULL) {
-        ev->loop = loop;
-    }
-    if (ev->event_type == 0) {
-        ev->event_type = HEVENT_TYPE_CUSTOM;
-    }
-    if (ev->event_id == 0) {
-        ev->event_id = ++loop->event_counter;
-    }
-    event_queue_push_back(&loop->custom_events, ev);
-    hwrite(loop, loop->sockpair[0], &buf, 1, NULL);
-unlock:
-    hmutex_unlock(&loop->custom_events_mutex);
-}

+ 8 - 0
event/hloop.h

@@ -108,15 +108,23 @@ HV_EXPORT void hloop_free(hloop_t** pp);
 
 // NOTE: when no active events, loop will quit if HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS set.
 HV_EXPORT int hloop_run(hloop_t* loop);
+// NOTE: hloop_stop called in loop-thread just set flag to quit in next loop,
+// if called in other thread, it will wakeup loop-thread from blocking poll system call,
+// then you should join loop thread to safely exit loop thread.
 HV_EXPORT int hloop_stop(hloop_t* loop);
 HV_EXPORT int hloop_pause(hloop_t* loop);
 HV_EXPORT int hloop_resume(hloop_t* loop);
+HV_EXPORT int hloop_wakeup(hloop_t* loop);
 
 HV_EXPORT void     hloop_update_time(hloop_t* loop);
 HV_EXPORT uint64_t hloop_now(hloop_t* loop);          // s
 HV_EXPORT uint64_t hloop_now_ms(hloop_t* loop);       // ms
 HV_EXPORT uint64_t hloop_now_hrtime(hloop_t* loop);   // us
 #define hloop_now_us hloop_now_hrtime
+// @return pid of hloop_run
+HV_EXPORT long hloop_pid(hloop_t* loop);
+// @return tid of hloop_run
+HV_EXPORT long hloop_tid(hloop_t* loop);
 
 // userdata
 HV_EXPORT void  hloop_set_userdata(hloop_t* loop, void* userdata);

+ 10 - 0
examples/http_server_test.cpp

@@ -1,6 +1,8 @@
 #include "HttpServer.h"
 
 int main() {
+    HV_MEMCHECK;
+
     HttpService service;
     service.GET("/ping", [](HttpRequest* req, HttpResponse* resp) {
         resp->body = "pong";
@@ -16,6 +18,14 @@ int main() {
     http_server_t server;
     server.port = 8080;
     server.service = &service;
+
+#if 1
     http_server_run(&server);
+#else
+    // test http_server_stop
+    http_server_run(&server, 0);
+    sleep(10);
+    http_server_stop(&server);
+#endif
     return 0;
 }

+ 41 - 22
http/server/HttpServer.cpp

@@ -17,8 +17,10 @@ static HttpService  s_default_service;
 static FileCache    s_filecache;
 
 struct HttpServerPrivdata {
+    int                     quit;
     std::vector<hloop_t*>   loops;
-    std::mutex              loops_mutex;
+    std::vector<hthread_t>  threads;
+    std::mutex              mutex_;
 };
 
 static void on_recv(hio_t* io, void* _buf, int readbytes) {
@@ -190,10 +192,9 @@ handle_request:
         }
     }
 
-    static long s_pid = hv_getpid();
-    long tid = hv_gettid();
+    hloop_t* loop = hevent_loop(io);
     hlogi("[%ld-%ld][%s:%d][%s %s]=>[%d %s]",
-        s_pid, tid,
+        hloop_pid(loop), hloop_tid(loop),
         handler->ip, handler->port,
         http_method_str(req->method), req->path.c_str(),
         res->status_code, res->status_message());
@@ -280,11 +281,18 @@ static void worker_fn(void* userdata) {
     // timer handle_cached_files
     htimer_t* timer = htimer_add(loop, handle_cached_files, s_filecache.file_cached_time * 1000);
     hevent_set_userdata(timer, &s_filecache);
+
     // for SDK implement http_server_stop
     HttpServerPrivdata* privdata = (HttpServerPrivdata*)server->privdata;
-    privdata->loops_mutex.lock();
-    privdata->loops.push_back(loop);
-    privdata->loops_mutex.unlock();
+    if (privdata) {
+        std::lock_guard<std::mutex> locker(privdata->mutex_);
+        if (privdata->quit) {
+            hloop_free(&loop);
+            return;
+        }
+        privdata->loops.push_back(loop);
+    }
+
     hloop_run(loop);
     hloop_free(&loop);
 }
@@ -298,37 +306,48 @@ int http_server_run(http_server_t* server, int wait) {
     server->listenfd = Listen(server->port, server->host);
     if (server->listenfd < 0) return server->listenfd;
 
-    // privdata
-    server->privdata = new HttpServerPrivdata;
-
     if (server->worker_processes) {
+        // multi-processes
         return master_workers_run(worker_fn, server, server->worker_processes, server->worker_threads, wait);
     }
     else {
-        // NOTE: master_workers_run use global-vars that may be used by other,
-        // so we implement Multi-Threads directly.
+        // multi-threads
         int worker_threads = server->worker_threads;
         if (worker_threads == 0) worker_threads = 1;
+
+        // for SDK implement http_server_stop
+        HttpServerPrivdata* privdata = new HttpServerPrivdata;
+        privdata->quit = 0;
+        server->privdata = privdata;
+
+        int i = wait ? 1 : 0;
+        for (; i < worker_threads; ++i) {
+            hthread_t thrd = hthread_create((hthread_routine)worker_fn, server);
+            privdata->threads.push_back(thrd);
+        }
         if (wait) {
-            for (int i = 1; i < worker_threads; ++i) {
-                hthread_create((hthread_routine)worker_fn, server);
-            }
             worker_fn(server);
         }
-        else {
-            for (int i = 0; i < worker_threads; ++i) {
-                hthread_create((hthread_routine)worker_fn, server);
-            }
-        }
         return 0;
     }
 }
 
 int http_server_stop(http_server_t* server) {
     HttpServerPrivdata* privdata = (HttpServerPrivdata*)server->privdata;
-    for (auto& loop : privdata->loops) {
+    if (privdata == NULL) return 0;
+
+    privdata->mutex_.lock();
+    privdata->quit = 1;
+    for (auto loop : privdata->loops) {
         hloop_stop(loop);
     }
-    SAFE_DELETE(privdata);
+    privdata->mutex_.unlock();
+
+    for (auto thrd : privdata->threads) {
+        hthread_join(thrd);
+    }
+
+    delete privdata;
+    server->privdata = NULL;
     return 0;
 }