Kaynağa Gözat

Add examples/multi-thread

ithewei 4 yıl önce
ebeveyn
işleme
93991d07a7

+ 12 - 0
Makefile

@@ -43,6 +43,9 @@ examples: hmain_test htimer_test hloop_test \
 	tcp_proxy_server \
 	udp_echo_server \
 	udp_proxy_server \
+	multi-acceptor-processes \
+	multi-acceptor-threads \
+	one-acceptor-multi-workers \
 	http_server_test http_client_test \
 	websocket_server_test \
 	websocket_client_test \
@@ -92,6 +95,15 @@ udp_echo_server: prepare
 udp_proxy_server: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/udp_proxy_server.c"
 
+multi-acceptor-processes: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/multi-acceptor-processes.c"
+
+multi-acceptor-threads: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/multi-acceptor-threads.c"
+
+one-acceptor-multi-workers: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/multi-thread/one-acceptor-multi-workers.c"
+
 nc: prepare
 	$(MAKEF) TARGET=$@ SRCDIRS=". base ssl event" SRCS="examples/nc.c"
 

+ 4 - 1
README-CN.md

@@ -17,7 +17,7 @@
 [![awesome-c](https://badgen.net/badge/icon/awesome-c/pink?icon=awesome&label&color)](https://github.com/oz123/awesome-c)
 [![awesome-cpp](https://badgen.net/badge/icon/awesome-cpp/pink?icon=awesome&label&color)](https://github.com/fffaraz/awesome-cpp)
 
-`libhv`是一个类似于`libevent、libev、libuv`的跨平台网络库,提供了更简单的接口和更丰富的协议。
+`libhv`是一个类似于`libevent、libev、libuv`的跨平台网络库,提供了更易用的接口和更丰富的协议。
 
 ## ✨ 特征
 
@@ -236,6 +236,9 @@ ab -c 100 -n 100000 http://127.0.0.1:8080/
 - UDP回显服务:  [examples/udp_echo_server.c](examples/udp_echo_server.c)
 - UDP代理服务:  [examples/udp_proxy_server.c](examples/udp_proxy_server.c)
 - jsonRPC示例:  [examples/jsonrpc](examples/jsonrpc)
+- 多accept进程模式: [examples/multi-thread/multi-acceptor-processes.c](examples/multi-thread/multi-acceptor-processes.c)
+- 多accept线程模式: [examples/multi-thread/multi-acceptor-threads.c](examples/multi-thread/multi-acceptor-threads.c)
+- 一个accept线程+多worker线程: [examples/multi-thread/one-acceptor-multi-workers.c](examples/multi-thread/one-acceptor-multi-workers.c)
 
 ### c++版本
 - 事件循环: [evpp/EventLoop_test.cpp](evpp/EventLoop_test.cpp)

+ 3 - 0
README.md

@@ -233,6 +233,9 @@ ab -c 100 -n 100000 http://127.0.0.1:8080/
 - [examples/udp_echo_server.c](examples/udp_echo_server.c)
 - [examples/udp_proxy_server.c](examples/udp_proxy_server.c)
 - [examples/jsonrpc](examples/jsonrpc)
+- [examples/multi-thread/multi-acceptor-processes.c](examples/multi-thread/multi-acceptor-processes.c)
+- [examples/multi-thread/multi-acceptor-threads.c](examples/multi-thread/multi-acceptor-threads.c)
+- [examples/multi-thread/one-acceptor-multi-workers.c](examples/multi-thread/one-acceptor-multi-workers.c)
 
 ### c++ version
 - [evpp/EventLoop_test.cpp](evpp/EventLoop_test.cpp)

+ 23 - 0
event/hloop.c

@@ -773,6 +773,29 @@ hio_t* hio_get(hloop_t* loop, int fd) {
     return io;
 }
 
+void hio_detach(hio_t* io) {
+    hloop_t* loop = io->loop;
+    int fd = io->fd;
+    assert(loop != NULL && fd < loop->ios.maxsize);
+    loop->ios.ptr[fd] = NULL;
+}
+
+void hio_attach(hloop_t* loop, hio_t* io) {
+    int fd = io->fd;
+    if (fd >= loop->ios.maxsize) {
+        int newsize = ceil2e(fd);
+        io_array_resize(&loop->ios, newsize > fd ? newsize : 2*fd);
+    }
+
+    if (loop->ios.ptr[fd] == NULL) {
+        io->loop = loop;
+        // NOTE: use new_loop readbuf
+        io->readbuf.base = loop->readbuf.base;
+        io->readbuf.len = loop->readbuf.len;
+        loop->ios.ptr[fd] = io;
+    }
+}
+
 int hio_add(hio_t* io, hio_cb cb, int events) {
     printd("hio_add fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
 #ifdef OS_WIN

+ 23 - 0
event/hloop.h

@@ -203,6 +203,29 @@ HV_EXPORT hio_t* hio_get(hloop_t* loop, int fd);
 HV_EXPORT int    hio_add(hio_t* io, hio_cb cb, int events DEFAULT(HV_READ));
 HV_EXPORT int    hio_del(hio_t* io, int events DEFAULT(HV_RDWR));
 
+// NOTE: io detach from old loop and attach to new loop
+/* @see examples/multi-thread/one-acceptor-multi-workers.c
+void new_conn_event(hevent_t* ev) {
+    hloop_t* loop = ev->loop;
+    hio_t* io = (hio_t*)hevent_userdata(ev);
+    hio_attach(loop, io);
+}
+
+void on_accpet(hio_t* io) {
+    hio_detach(io);
+
+    hloop_t* worker_loop = get_one_loop();
+    hevent_t ev;
+    memset(&ev, 0, sizeof(ev));
+    ev.loop = worker_loop;
+    ev.cb = new_conn_event;
+    ev.userdata = io;
+    hloop_post_event(worker_loop, &ev);
+}
+ */
+HV_EXPORT void hio_detach(/*hloop_t* loop,*/ hio_t* io);
+HV_EXPORT void hio_attach(hloop_t* loop, hio_t* io);
+
 // hio_t fields
 // NOTE: fd cannot be used as unique identifier, so we provide an id.
 HV_EXPORT uint32_t hio_id (hio_t* io);

+ 71 - 0
examples/multi-thread/multi-acceptor-processes.c

@@ -0,0 +1,71 @@
+/*
+ *
+ * @build   make examples
+ * @server  bin/multi-acceptor-processes 1234
+ * @client  bin/nc 127.0.0.1 1234
+ *          nc     127.0.0.1 1234
+ *          telnet 127.0.0.1 1234
+ */
+
+#include "hloop.h"
+#include "hsocket.h"
+#include "hthread.h"
+#include "hproc.h"
+
+static const char* host = "0.0.0.0";
+static int port = 1234;
+static int process_num = 4;
+static int listenfd = INVALID_SOCKET;
+
+static void on_close(hio_t* io) {
+    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
+}
+
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    // echo
+    hio_write(io, buf, readbytes);
+}
+
+static void on_accept(hio_t* io) {
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printf("pid=%ld connfd=%d [%s] <= [%s]\n",
+            (long)hv_getpid(),
+            (int)hio_fd(io),
+            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+
+    hio_setcb_close(io, on_close);
+    hio_setcb_read(io, on_recv);
+    hio_read(io);
+}
+
+static void loop_proc(void* userdata) {
+    hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
+    haccept(loop, listenfd, on_accept);
+    hloop_run(loop);
+}
+
+int main(int argc, char** argv) {
+    if (argc < 2) {
+        printf("Usage: cmd port\n");
+        return -10;
+    }
+    port = atoi(argv[1]);
+
+    listenfd = Listen(port, host);
+    if (listenfd < 0) {
+        exit(1);
+    }
+
+    proc_ctx_t ctx;
+    memset(&ctx, 0, sizeof(ctx));
+    ctx.proc = loop_proc;
+    for (int i = 0; i < process_num; ++i) {
+        hproc_spawn(&ctx);
+    }
+
+    while(1) hv_sleep(1);
+
+    return 0;
+}

+ 68 - 0
examples/multi-thread/multi-acceptor-threads.c

@@ -0,0 +1,68 @@
+/*
+ *
+ * @build   make examples
+ * @server  bin/multi-acceptor-threads 1234
+ * @client  bin/nc 127.0.0.1 1234
+ *          nc     127.0.0.1 1234
+ *          telnet 127.0.0.1 1234
+ */
+
+#include "hloop.h"
+#include "hsocket.h"
+#include "hthread.h"
+
+static const char* host = "0.0.0.0";
+static int port = 1234;
+static int thread_num = 4;
+static int listenfd = INVALID_SOCKET;
+
+static void on_close(hio_t* io) {
+    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
+}
+
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    // echo
+    hio_write(io, buf, readbytes);
+}
+
+static void on_accept(hio_t* io) {
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
+            (long)hv_gettid(),
+            (int)hio_fd(io),
+            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+
+    hio_setcb_close(io, on_close);
+    hio_setcb_read(io, on_recv);
+    hio_read(io);
+}
+
+static HTHREAD_RETTYPE loop_thread(void* userdata) {
+    hloop_t* loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
+    haccept(loop, listenfd, on_accept);
+    hloop_run(loop);
+    return 0;
+}
+
+int main(int argc, char** argv) {
+    if (argc < 2) {
+        printf("Usage: cmd port\n");
+        return -10;
+    }
+    port = atoi(argv[1]);
+
+    listenfd = Listen(port, host);
+    if (listenfd < 0) {
+        exit(1);
+    }
+
+    for (int i = 0; i < thread_num; ++i) {
+        hthread_create(loop_thread, NULL);
+    }
+
+    while(1) hv_sleep(1);
+
+    return 0;
+}

+ 100 - 0
examples/multi-thread/one-acceptor-multi-workers.c

@@ -0,0 +1,100 @@
+/*
+ *
+ * @build   make examples
+ * @server  bin/one-acceptor-multi-workers 1234
+ * @client  bin/nc 127.0.0.1 1234
+ *          nc     127.0.0.1 1234
+ *          telnet 127.0.0.1 1234
+ */
+
+#include "hloop.h"
+#include "hsocket.h"
+#include "hthread.h"
+
+static const char* host = "0.0.0.0";
+static int port = 1234;
+static int thread_num = 4;
+static hloop_t*  accept_loop = NULL;
+static hloop_t** worker_loops = NULL;
+
+static hloop_t* get_next_loop() {
+    static int s_cur_index = 0;
+    if (s_cur_index == thread_num) {
+        s_cur_index = 0;
+    }
+    return worker_loops[s_cur_index++];
+}
+
+static void on_close(hio_t* io) {
+    printf("on_close fd=%d error=%d\n", hio_fd(io), hio_error(io));
+}
+
+static void on_recv(hio_t* io, void* buf, int readbytes) {
+    // echo
+    hio_write(io, buf, readbytes);
+}
+
+static void new_conn_event(hevent_t* ev) {
+    hloop_t* loop = ev->loop;
+    hio_t* io = (hio_t*)hevent_userdata(ev);
+    hio_attach(loop, io);
+
+    char localaddrstr[SOCKADDR_STRLEN] = {0};
+    char peeraddrstr[SOCKADDR_STRLEN] = {0};
+    printf("tid=%ld connfd=%d [%s] <= [%s]\n",
+            (long)hv_gettid(),
+            (int)hio_fd(io),
+            SOCKADDR_STR(hio_localaddr(io), localaddrstr),
+            SOCKADDR_STR(hio_peeraddr(io), peeraddrstr));
+
+    hio_setcb_close(io, on_close);
+    hio_setcb_read(io, on_recv);
+    hio_read(io);
+}
+
+static void on_accept(hio_t* io) {
+    hio_detach(io);
+
+    hloop_t* worker_loop = get_next_loop();
+    hevent_t ev;
+    memset(&ev, 0, sizeof(ev));
+    ev.loop = worker_loop;
+    ev.cb = new_conn_event;
+    ev.userdata = io;
+    hloop_post_event(worker_loop, &ev);
+}
+
+static HTHREAD_RETTYPE worker_thread(void* userdata) {
+    hloop_t* loop = (hloop_t*)userdata;
+    hloop_run(loop);
+    return 0;
+}
+
+static HTHREAD_RETTYPE accept_thread(void* userdata) {
+    hloop_t* loop = (hloop_t*)userdata;
+    hio_t* listenio = hloop_create_tcp_server(loop, host, port, on_accept);
+    if (listenio == NULL) {
+        exit(1);
+    }
+    hloop_run(loop);
+    return 0;
+}
+
+int main(int argc, char** argv) {
+    if (argc < 2) {
+        printf("Usage: cmd port\n");
+        return -10;
+    }
+    port = atoi(argv[1]);
+
+    worker_loops = (hloop_t**)malloc(sizeof(hloop_t*) * thread_num);
+    for (int i = 0; i < thread_num; ++i) {
+        worker_loops[i] = hloop_new(HLOOP_FLAG_AUTO_FREE);
+        hthread_create(worker_thread, worker_loops[i]);
+    }
+
+    accept_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
+    accept_thread(accept_loop);
+
+    return 0;
+}