Explorar o código

master_workers_run

hewei %!s(int64=6) %!d(string=hai) anos
pai
achega
c14ace0cf2
Modificáronse 7 ficheiros con 189 adicións e 153 borrados
  1. 1 0
      etc/test.conf
  2. 1 1
      http/server/FileCache.h
  3. 30 82
      http/server/HttpServer.cpp
  4. 1 1
      http/server/HttpServer.h
  5. 13 38
      main.cpp.tmpl
  6. 128 25
      utils/hmain.cpp
  7. 15 6
      utils/hmain.h

+ 1 - 0
etc/test.conf

@@ -9,5 +9,6 @@ log_filesize = 16M
 # worker_processes = 4
 # auto = ncpu
 worker_processes = auto
+worker_threads = 2
 
 port = 8086

+ 1 - 1
http/server/FileCache.h

@@ -53,6 +53,7 @@ public:
     int file_stat_interval;
     int file_cached_time;
     FileCacheMap cached_files;
+    std::mutex mutex_;
 
     FileCache() {
         file_stat_interval  = DEFAULT_FILE_STAT_INTERVAL;
@@ -70,7 +71,6 @@ public:
     int Close(const char* filepath);
 protected:
     file_cache_t* Get(const char* filepath);
-    std::mutex mutex_;
 };
 
 #endif // HV_FILE_CACHE_H_

+ 30 - 82
http/server/HttpServer.cpp

@@ -15,26 +15,9 @@
 static HttpService  s_default_service;
 static FileCache    s_filecache;
 
-static void master_init(void* userdata) {
-#ifdef OS_UNIX
-    char proctitle[256] = {0};
-    snprintf(proctitle, sizeof(proctitle), "%s: master process", g_main_ctx.program_name);
-    setproctitle(proctitle);
-#endif
-}
-
-static void master_proc(void* userdata) {
-    while (1) sleep(1);
-}
-
-static void worker_init(void* userdata) {
-#ifdef OS_UNIX
-    char proctitle[256] = {0};
-    snprintf(proctitle, sizeof(proctitle), "%s: worker process", g_main_ctx.program_name);
-    setproctitle(proctitle);
-    signal(SIGNAL_RELOAD, signal_handler);
-#endif
-}
+struct HttpServerPrivdata {
+    std::vector<hloop_t*>   loops;
+};
 
 static void on_recv(hio_t* io, void* _buf, int readbytes) {
     // printf("on_recv fd=%d readbytes=%d\n", hio_fd(io), readbytes);
@@ -262,6 +245,7 @@ static void handle_cached_files(htimer_t* timer) {
     file_cache_t* fc = NULL;
     time_t tt;
     time(&tt);
+    std::lock_guard<std::mutex> locker(pfc->mutex_);
     auto iter = pfc->cached_files.begin();
     while (iter != pfc->cached_files.end()) {
         fc = iter->second;
@@ -278,15 +262,10 @@ static void fsync_logfile(hidle_t* idle) {
     hlog_fsync();
 }
 
-static HTHREAD_ROUTINE(worker_thread) {
-    hlogi("worker_thread pid=%d tid=%d", getpid(), gettid());
+static void worker_fn(void* userdata) {
     http_server_t* server = (http_server_t*)userdata;
     int listenfd = server->listenfd;
     hloop_t* loop = hloop_new(0);
-    // for SDK implement http_server_stop
-    if (server->worker_processes == 0 && server->worker_threads <= 1) {
-        server->privdata = (void*)loop;
-    }
     hio_t* listenio = haccept(loop, listenfd, on_accept);
     hevent_set_userdata(listenio, server->service);
     if (server->ssl) {
@@ -298,28 +277,14 @@ static HTHREAD_ROUTINE(worker_thread) {
     // timer handle_cached_files
     htimer_t* timer = htimer_add(loop, handle_cached_files, s_filecache.file_cached_time * 1000);
     hevent_set_userdata(timer, &s_filecache);
+    // for SDK implement http_server_stop
+    HttpServerPrivdata* privdata = (HttpServerPrivdata*)server->privdata;
+    privdata->loops.push_back(loop);
     hloop_run(loop);
     hloop_free(&loop);
-    return 0;
-}
-
-static void worker_proc(void* userdata) {
-    http_server_t* server = (http_server_t*)userdata;
-    if (server->worker_threads == 0) {
-        worker_thread(userdata);
-    }
-    else {
-        for (int i = 0; i < server->worker_threads; ++i) {
-            hthread_create(worker_thread, server);
-        }
-    }
 }
 
 int http_server_run(http_server_t* server, int wait) {
-    // worker_processes
-    if (server->worker_processes != 0 && g_worker_processes_num != 0 && g_worker_processes != NULL) {
-        return ERR_OVER_LIMIT;
-    }
     // service
     if (server->service == NULL) {
         server->service = &s_default_service;
@@ -328,54 +293,37 @@ int http_server_run(http_server_t* server, int wait) {
     server->listenfd = Listen(server->port, server->host);
     if (server->listenfd < 0) return server->listenfd;
 
-#ifdef OS_WIN
-    // windows not provide MultiProcesses
-    if (server->g_worker_processes != 0) {
-        server->worker_threads = server->worker_proc;
-        server->worker_processes = 0;
-    }
-#endif
+    // privdata
+    server->privdata = new HttpServerPrivdata;
 
-    if (server->worker_processes == 0) {
-        if (wait == 0 && server->worker_threads == 0) {
-            server->worker_threads = 1;
-        }
-        worker_proc(server);
-        if (wait) {
-            master_proc(NULL);
-        }
+    if (server->worker_processes) {
+        return master_workers_run(worker_fn, server, server->worker_processes, server->worker_threads, wait);
     }
     else {
-        // master-workers processes
-        g_worker_processes_num = server->worker_processes;
-        int bytes = g_worker_processes_num * sizeof(proc_ctx_t);
-        g_worker_processes = (proc_ctx_t*)malloc(bytes);
-        memset(g_worker_processes, 0, bytes);
-        for (int i = 0; i < g_worker_processes_num; ++i) {
-            proc_ctx_t* ctx = g_worker_processes + i;
-            ctx->init = worker_init;
-            ctx->init_userdata = NULL;
-            ctx->proc = worker_proc;
-            ctx->proc_userdata = server;
-            spawn_proc(ctx);
-            hlogi("workers[%d] start/running, pid=%d", i, ctx->pid);
-        }
+        // NOTE: master_workers_run use global-vars that may be used by other,
+        // so we implement Multi-Threads directly.
+        int worker_threads = server->worker_threads;
+        if (worker_threads == 0) worker_threads = 1;
         if (wait) {
-            master_init(NULL);
-            master_proc(NULL);
+            for (int i = 1; i < worker_threads; ++i) {
+                hthread_create((hthread_routine)worker_fn, server);
+            }
+            worker_fn(server);
+        }
+        else {
+            for (int i = 0; i < worker_threads; ++i) {
+                hthread_create((hthread_routine)worker_fn, server);
+            }
         }
+        return 0;
     }
-
-    return 0;
 }
 
 int http_server_stop(http_server_t* server) {
-    if (server->worker_processes == 0 && server->worker_threads <= 1) {
-        if (server->privdata) {
-            hloop_t* loop = (hloop_t*)server->privdata;
-            hloop_stop(loop);
-            server->privdata = NULL;
-        }
+    HttpServerPrivdata* privdata = (HttpServerPrivdata*)server->privdata;
+    for (auto& loop : privdata->loops) {
+        hloop_stop(loop);
     }
+    SAFE_DELETE(privdata);
     return 0;
 }

+ 1 - 1
http/server/HttpServer.h

@@ -55,7 +55,7 @@ int main() {
  */
 int http_server_run(http_server_t* server, int wait = 1);
 
-// just for worker_processes = 0 && worker_threads <= 1
+// just for worker_processes = 0
 int http_server_stop(http_server_t* server);
 
 #endif

+ 13 - 38
main.cpp.tmpl

@@ -6,6 +6,7 @@ typedef struct conf_ctx_s {
     IniParser* parser;
     int loglevel;
     int worker_processes;
+    int worker_threads;
     int port;
 } conf_ctx_t;
 conf_ctx_t g_conf_ctx;
@@ -14,6 +15,7 @@ inline void conf_ctx_init(conf_ctx_t* ctx) {
     ctx->parser = new IniParser;
     ctx->loglevel = LOG_LEVEL_DEBUG;
     ctx->worker_processes = 0;
+    ctx->worker_threads = 0;
     ctx->port = 0;
 }
 
@@ -21,11 +23,7 @@ static void print_version();
 static void print_help();
 
 static int  parse_confile(const char* confile);
-
-static void master_init(void* userdata);
-static void master_proc(void* userdata);
-static void worker_init(void* userdata);
-static void worker_proc(void* userdata);
+static void worker_fn(void* userdata);
 
 // short options
 static const char options[] = "hvc:ts:dp:";
@@ -142,6 +140,9 @@ int parse_confile(const char* confile) {
         }
     }
     g_conf_ctx.worker_processes = LIMIT(0, worker_processes, MAXNUM_WORKER_PROCESSES);
+    // worker_threads
+    int worker_threads = g_conf_ctx.parser->Get<int>("worker_threads");
+    g_conf_ctx.worker_threads = LIMIT(0, worker_threads, 16);
 
     // port
     int port = 0;
@@ -268,41 +269,15 @@ int main(int argc, char** argv) {
     // pidfile
     create_pidfile();
 
-    if (g_conf_ctx.worker_processes == 0) {
-        // single process
-        worker_proc(NULL);
-    }
-    else {
-        // master-workers processes
-        g_worker_processes_num = g_conf_ctx.worker_processes;
-        int bytes = sizeof(proc_ctx_t) * g_worker_processes_num;
-        g_worker_processes = (proc_ctx_t*)malloc(bytes);
-        if (g_worker_processes == NULL) {
-            return ERR_MALLOC;
-        }
-        memset(g_worker_processes, 0, bytes);
-        proc_ctx_t* ctx = g_worker_processes;
-        for (int i = 0; i < g_worker_processes_num; ++i, ++ctx) {
-            ctx->init = worker_init;
-            ctx->init_userdata = NULL;
-            ctx->proc = worker_proc;
-            ctx->proc_userdata = NULL;
-            spawn_proc(ctx);
-            hlogi("workers[%d] start/running, pid=%d", i, ctx->pid);
-        }
-
-        hlogi("master start/running, pid=%d", g_main_ctx.pid);
-        master_init(NULL);
-        master_proc(NULL);
-    }
+    master_workers_run(worker_fn, (void*)100L, g_conf_ctx.worker_processes, g_conf_ctx.worker_threads);
 
     return 0;
 }
 
-void master_proc(void* userdata) {
-    while (1) sleep(1);
-}
-
-void worker_proc(void* userdata) {
-    while (1) sleep(1);
+void worker_fn(void* userdata) {
+    int num = (int)(long)(userdata);
+    while (1) {
+        printf("num=%d pid=%d tid=%d\n", num, getpid(), gettid());
+        sleep(60);
+    }
 }

+ 128 - 25
utils/hmain.cpp

@@ -3,13 +3,31 @@
 #include "hplatform.h"
 #include "hlog.h"
 #include "htime.h"
+#include "herr.h"
+#include "hthread.h"
 
 main_ctx_t  g_main_ctx;
 int         g_worker_processes_num = 0;
+int         g_worker_threads_num = 0;
 proc_ctx_t* g_worker_processes = NULL;
+procedure_t g_worker_fn = NULL;
+void*       g_worker_userdata = NULL;
 
 int main_ctx_init(int argc, char** argv) {
-    g_main_ctx.pid = getpid();
+    if (argc == 0 || argv == NULL) {
+        argc = 1;
+        argv = (char**)malloc(2*sizeof(char*));
+        argv[0] = (char*)malloc(MAX_PATH);
+        argv[1] = NULL;
+#ifdef OS_WIN
+        GetModuleFileName(NULL, argv[0], MAX_PATH);
+#elif defined(OS_LINUX)
+        readlink("/proc/self/exe", argv[0], MAX_PATH);
+#else
+        strcpy(argv[0], "./unnamed");
+#endif
+    }
+
     char* cwd = getcwd(g_main_ctx.run_path, sizeof(g_main_ctx.run_path));
     if (cwd == NULL) {
         printf("getcwd error\n");
@@ -32,6 +50,29 @@ int main_ctx_init(int argc, char** argv) {
     }
 #endif
     //printf("program_name=%s\n", g_main_ctx.program_name);
+    char logpath[MAX_PATH] = {0};
+    snprintf(logpath, sizeof(logpath), "%s/logs", g_main_ctx.run_path);
+    MKDIR(logpath);
+    snprintf(g_main_ctx.confile, sizeof(g_main_ctx.confile), "%s/etc/%s.conf", g_main_ctx.run_path, g_main_ctx.program_name);
+    snprintf(g_main_ctx.pidfile, sizeof(g_main_ctx.pidfile), "%s/logs/%s.pid", g_main_ctx.run_path, g_main_ctx.program_name);
+    snprintf(g_main_ctx.logfile, sizeof(g_main_ctx.confile), "%s/logs/%s.log", g_main_ctx.run_path, g_main_ctx.program_name);
+    hlog_set_file(g_main_ctx.logfile);
+
+    g_main_ctx.pid = getpid();
+    g_main_ctx.oldpid = getpid_from_pidfile();
+#ifdef OS_UNIX
+    if (kill(g_main_ctx.oldpid, 0) == -1 && errno == ESRCH) {
+        g_main_ctx.oldpid = -1;
+    }
+#else
+    HANDLE hproc = OpenProcess(PROCESS_TERMINATE, FALSE, g_main_ctx.oldpid);
+    if (hproc == NULL) {
+        g_main_ctx.oldpid = -1;
+    }
+    else {
+        CloseHandle(hproc);
+    }
+#endif
 
     // save arg
     int i = 0;
@@ -90,30 +131,6 @@ int main_ctx_init(int argc, char** argv) {
     }
 #endif
 
-    char logpath[MAX_PATH] = {0};
-    snprintf(logpath, sizeof(logpath), "%s/logs", g_main_ctx.run_path);
-    MKDIR(logpath);
-    snprintf(g_main_ctx.confile, sizeof(g_main_ctx.confile), "%s/etc/%s.conf", g_main_ctx.run_path, g_main_ctx.program_name);
-    snprintf(g_main_ctx.pidfile, sizeof(g_main_ctx.pidfile), "%s/logs/%s.pid", g_main_ctx.run_path, g_main_ctx.program_name);
-    snprintf(g_main_ctx.logfile, sizeof(g_main_ctx.confile), "%s/logs/%s.log", g_main_ctx.run_path, g_main_ctx.program_name);
-
-    hlog_set_file(g_main_ctx.logfile);
-
-    g_main_ctx.oldpid = getpid_from_pidfile();
-#ifdef OS_UNIX
-    if (kill(g_main_ctx.oldpid, 0) == -1 && errno == ESRCH) {
-        g_main_ctx.oldpid = -1;
-    }
-#else
-    HANDLE hproc = OpenProcess(PROCESS_TERMINATE, FALSE, g_main_ctx.oldpid);
-    if (hproc == NULL) {
-        g_main_ctx.oldpid = -1;
-    }
-    else {
-        CloseHandle(hproc);
-    }
-#endif
-
     return 0;
 }
 
@@ -502,3 +519,89 @@ void handle_signal(const char* signal) {
     }
     printf("%s start/running\n", g_main_ctx.program_name);
 }
+
+// master-workers processes
+static HTHREAD_ROUTINE(worker_thread) {
+    hlogi("worker_thread pid=%d tid=%d", getpid(), gettid());
+    if (g_worker_fn) {
+        g_worker_fn(g_worker_userdata);
+    }
+    return 0;
+}
+
+static void worker_init(void* userdata) {
+#ifdef OS_UNIX
+    char proctitle[256] = {0};
+    snprintf(proctitle, sizeof(proctitle), "%s: worker process", g_main_ctx.program_name);
+    setproctitle(proctitle);
+    signal(SIGNAL_RELOAD, signal_handler);
+#endif
+}
+
+static void worker_proc(void* userdata) {
+    for (int i = 1; i < g_worker_threads_num; ++i) {
+        hthread_create(worker_thread, NULL);
+    }
+    worker_thread(NULL);
+}
+
+int master_workers_run(procedure_t worker_fn, void* worker_userdata,
+        int worker_processes, int worker_threads, bool wait) {
+#ifdef OS_WIN
+        // NOTE: Windows not provide MultiProcesses
+        if (worker_threads == 0) {
+            // MultiProcesses => MultiThreads
+            worker_threads = worker_processes;
+        }
+        worker_processes = 0;
+#endif
+    if (worker_threads == 0) worker_threads = 1;
+
+    g_worker_threads_num = worker_threads;
+    g_worker_fn = worker_fn;
+    g_worker_userdata = worker_userdata;
+
+    if (worker_processes == 0) {
+        // single process
+        if (wait) {
+            for (int i = 1; i < worker_threads; ++i) {
+                hthread_create(worker_thread, NULL);
+            }
+            worker_thread(NULL);
+        }
+        else {
+            for (int i = 0; i < worker_threads; ++i) {
+                hthread_create(worker_thread, NULL);
+            }
+        }
+    }
+    else {
+        if (g_worker_processes_num != 0) {
+            return ERR_OVER_LIMIT;
+        }
+        // master-workers processes
+#ifdef OS_UNIX
+        char proctitle[256] = {0};
+        snprintf(proctitle, sizeof(proctitle), "%s: master process", g_main_ctx.program_name);
+        setproctitle(proctitle);
+        signal(SIGNAL_RELOAD, signal_handler);
+#endif
+        g_worker_processes_num = worker_processes;
+        int bytes = g_worker_processes_num * sizeof(proc_ctx_t);
+        g_worker_processes = (proc_ctx_t*)malloc(bytes);
+        memset(g_worker_processes, 0, bytes);
+        proc_ctx_t* ctx = g_worker_processes;
+        for (int i = 0; i < g_worker_processes_num; ++i, ++ctx) {
+            ctx->init = worker_init;
+            ctx->proc = worker_proc;
+            spawn_proc(ctx);
+            hlogi("workers[%d] start/running, pid=%d", i, ctx->pid);
+        }
+        g_main_ctx.pid = getpid();
+        hlogi("master start/running, pid=%d", g_main_ctx.pid);
+        if (wait) {
+            while (1) sleep (1);
+        }
+    }
+    return 0;;
+}

+ 15 - 6
utils/hmain.h

@@ -7,10 +7,14 @@
 #include "hproc.h"
 
 typedef struct main_ctx_s {
-    pid_t   oldpid; // getpid_from_pidfile
-    pid_t   pid;    // getpid
     char    run_path[MAX_PATH];
     char    program_name[MAX_PATH];
+    char    confile[MAX_PATH]; // default etc/${program}.conf
+    char    pidfile[MAX_PATH]; // default logs/${program}.pid
+    char    logfile[MAX_PATH]; // default logs/${program}.log
+
+    pid_t   pid;    // getpid
+    pid_t   oldpid; // getpid_from_pidfile
 
     int     argc;
     int     arg_len;
@@ -23,10 +27,6 @@ typedef struct main_ctx_s {
     char**  os_envp;
     char**  save_envp;
 
-    char    confile[MAX_PATH]; // default etc/${program}.conf
-    char    pidfile[MAX_PATH]; // default logs/${program}.pid
-    char    logfile[MAX_PATH]; // default logs/${program}.log
-
     keyval_t    arg_kv;
     StringList  arg_list;
     keyval_t    env_kv;
@@ -82,6 +82,15 @@ void signal_handler(int signo);
 #define MAXNUM_WORKER_PROCESSES     1024
 extern main_ctx_t   g_main_ctx;
 extern int          g_worker_processes_num;
+extern int          g_worker_threads_num;
 extern proc_ctx_t*  g_worker_processes;
+extern procedure_t  g_worker_fn;
+extern void*        g_worker_userdata;
+
+// master-workers processes
+int master_workers_run(procedure_t worker_fn, void* worker_userdata,
+        int worker_processes = DEFAULT_WORKER_PROCESSES,
+        int worker_threads = 0,
+        bool wait = true);
 
 #endif // HV_MAIN_H_