ithewei 6 лет назад
Родитель
Сommit
9cc79d1e5b
6 измененных файлов с 282 добавлено и 206 удалено
  1. 3 2
      etc/test.conf
  2. 172 4
      hmain.cpp
  3. 19 4
      hmain.h
  4. 11 3
      hplatform.h
  5. 25 8
      hproc.h
  6. 52 185
      main.cpp.tmpl

+ 3 - 2
etc/test.conf

@@ -5,7 +5,8 @@
 loglevel = DEBUG
 log_remain_days = 3
 
-# if not set worker_processes, default = ncpu
-worker_processes = 4
+# worker_processes = 4
+# auto = ncpu
+worker_processes = auto
 
 port = 8086

+ 172 - 4
hmain.cpp

@@ -1,12 +1,12 @@
 #include "hmain.h"
 
-#include <signal.h> // for kill
-#include <errno.h>
-
 #include "hplatform.h"
 #include "hlog.h"
+#include "htime.h"
 
-main_ctx_t g_main_ctx;
+main_ctx_t  g_main_ctx;
+int         g_worker_processes_num = 0;
+proc_ctx_t* g_worker_processes = NULL;
 
 int main_ctx_init(int argc, char** argv) {
     g_main_ctx.pid = getpid();
@@ -314,3 +314,171 @@ pid_t getpid_from_pidfile() {
     return atoi(pid);
 }
 
+static procedure_t s_reload_fn = NULL;
+static void*       s_reload_userdata = NULL;
+#ifdef OS_UNIX
+// unix use signal
+#include <sys/wait.h>
+
+void signal_handler(int signo) {
+    hlogi("pid=%d recv signo=%d", getpid(), signo);
+    switch (signo) {
+    case SIGINT:
+    case SIGNAL_TERMINATE:
+        hlogi("killall processes");
+        signal(SIGCHLD, SIG_IGN);
+        for (int i = 0; i < g_worker_processes_num; ++i) {
+            if (g_worker_processes[i].pid <= 0) break;
+            kill(g_worker_processes[i].pid, SIGKILL);
+            g_worker_processes[i].pid = -1;
+        }
+        exit(0);
+        break;
+    case SIGNAL_RELOAD:
+        if (s_reload_fn) {
+            s_reload_fn(s_reload_userdata);
+            if (getpid_from_pidfile() == getpid()) {
+                // master raise SIGNAL_RELOAD => workers
+                for (int i = 0; i < g_worker_processes_num; ++i) {
+                    if (g_worker_processes[i].pid <= 0) break;
+                    kill(g_worker_processes[i].pid, SIGNAL_RELOAD);
+                }
+            }
+        }
+        break;
+    case SIGCHLD:
+    {
+        pid_t pid = 0;
+        int status = 0;
+        while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
+            hlogw("proc stop/waiting, pid=%d status=%d", pid, status);
+            for (int i = 0; i < g_worker_processes_num; ++i) {
+                if (g_worker_processes[i].pid == pid) {
+                    g_worker_processes[i].pid = -1;
+                    create_proc(&g_worker_processes[i]);
+                    break;
+                }
+            }
+        }
+    }
+        break;
+    default:
+        break;
+    }
+}
+
+int signal_init(procedure_t reload_fn, void* reload_userdata) {
+    s_reload_fn = reload_fn;
+    s_reload_userdata = reload_userdata;
+
+    signal(SIGINT, signal_handler);
+    signal(SIGCHLD, signal_handler);
+    signal(SIGNAL_TERMINATE, signal_handler);
+    signal(SIGNAL_RELOAD, signal_handler);
+
+    return 0;
+}
+
+#elif defined(OS_WIN)
+// win32 use Event
+static HANDLE s_hEventTerm = NULL;
+static HANDLE s_hEventReload = NULL;
+
+#include <mmsystem.h>
+#ifdef _MSC_VER
+#pragma comment(lib, "winmm.lib")
+#endif
+void WINAPI on_timer(UINT uTimerID, UINT uMsg, DWORD_PTR dwUser, DWORD_PTR dw1, DWORD_PTR dw2) {
+    DWORD ret = WaitForSingleObject(s_hEventTerm, 0);
+    if (ret == WAIT_OBJECT_0) {
+        timeKillEvent(uTimerID);
+        hlogi("pid=%d recv event [TERM]", getpid());
+        exit(0);
+    }
+
+    ret = WaitForSingleObject(s_hEventReload, 0);
+    if (ret == WAIT_OBJECT_0) {
+        hlogi("pid=%d recv event [RELOAD]", getpid());
+        if (s_reload_fn) {
+            s_reload_fn(s_reload_userdata);
+        }
+    }
+}
+
+void signal_cleanup() {
+    CloseHandle(s_hEventTerm);
+    s_hEventTerm = NULL;
+    CloseHandle(s_hEventReload);
+    s_hEventReload = NULL;
+}
+
+int signal_init(procedure_t reload_fn, void* reload_userdata) {
+    s_reload_fn = reload_fn;
+    s_reload_userdata = reload_userdata;
+
+    char eventname[MAX_PATH] = {0};
+    snprintf(eventname, sizeof(eventname), "%s_term_event", g_main_ctx.program_name);
+    s_hEventTerm = CreateEvent(NULL, FALSE, FALSE, eventname);
+    //s_hEventTerm = OpenEvent(EVENT_ALL_ACCESS, FALSE, eventname);
+    snprintf(eventname, sizeof(eventname), "%s_reload_event", g_main_ctx.program_name);
+    s_hEventReload = CreateEvent(NULL, FALSE, FALSE, eventname);
+
+    timeSetEvent(1000, 1000, on_timer, 0, TIME_PERIODIC);
+
+    atexit(signal_cleanup);
+    return 0;
+}
+#endif
+
+void handle_signal(const char* signal) {
+    if (strcmp(signal, "start") == 0) {
+        if (g_main_ctx.oldpid > 0) {
+            printf("%s is already running, pid=%d\n", g_main_ctx.program_name, g_main_ctx.oldpid);
+            exit(0);
+        }
+    } else if (strcmp(signal, "stop") == 0) {
+        if (g_main_ctx.oldpid > 0) {
+#ifdef OS_UNIX
+            kill(g_main_ctx.oldpid, SIGNAL_TERMINATE);
+#else
+            SetEvent(s_hEventTerm);
+#endif
+            printf("%s stop/waiting\n", g_main_ctx.program_name);
+        } else {
+            printf("%s is already stopped\n", g_main_ctx.program_name);
+        }
+        exit(0);
+    } else if (strcmp(signal, "restart") == 0) {
+        if (g_main_ctx.oldpid > 0) {
+#ifdef OS_UNIX
+            kill(g_main_ctx.oldpid, SIGNAL_TERMINATE);
+#else
+            SetEvent(s_hEventTerm);
+#endif
+            printf("%s stop/waiting\n", g_main_ctx.program_name);
+            msleep(1000);
+        }
+    } else if (strcmp(signal, "status") == 0) {
+        if (g_main_ctx.oldpid > 0) {
+            printf("%s start/running, pid=%d\n", g_main_ctx.program_name, g_main_ctx.oldpid);
+        } else {
+            printf("%s stop/waiting\n", g_main_ctx.program_name);
+        }
+        exit(0);
+    } else if (strcmp(signal, "reload") == 0) {
+        if (g_main_ctx.oldpid > 0) {
+            printf("reload confile [%s]\n", g_main_ctx.confile);
+#ifdef OS_UNIX
+            kill(g_main_ctx.oldpid, SIGNAL_RELOAD);
+#else
+            SetEvent(s_hEventReload);
+#endif
+        }
+        sleep(1);
+        exit(0);
+    } else {
+        printf("Invalid signal: '%s'\n", signal);
+        exit(0);
+    }
+    printf("%s start/running\n", g_main_ctx.program_name);
+}

+ 19 - 4
hmain.h

@@ -4,6 +4,7 @@
 #include "hplatform.h"
 #include "hdef.h"
 #include "hstring.h"
+#include "hproc.h"
 
 typedef struct main_ctx_s {
     pid_t   oldpid; // getpid_from_pidfile
@@ -29,12 +30,8 @@ typedef struct main_ctx_s {
     keyval_t    arg_kv;
     StringList  arg_list;
     keyval_t    env_kv;
-
-    void*   confile_parser; // deprecated
 } main_ctx_t;
 
-extern main_ctx_t g_main_ctx;
-
 // arg_type
 #define NO_ARGUMENT         0
 #define REQUIRED_ARGUMENT   1
@@ -65,8 +62,26 @@ const char* get_env(const char* key);
 void setproctitle(const char* title);
 #endif
 
+// pidfile
 int      create_pidfile();
 void     delete_pidfile();
 pid_t    getpid_from_pidfile();
 
+// signal=[start,stop,restart,status,reload]
+int signal_init(procedure_t reload_fn = NULL, void* reload_userdata = NULL);
+void handle_signal(const char* signal);
+#ifdef OS_UNIX
+// we use SIGTERM to quit process, SIGUSR1 to reload confile
+#define SIGNAL_TERMINATE    SIGTERM
+#define SIGNAL_RELOAD       SIGUSR1
+void signal_handler(int signo);
+#endif
+
+// global var
+#define DEFAULT_WORKER_PROCESSES    4
+#define MAXNUM_WORKER_PROCESSES     1024
+extern main_ctx_t   g_main_ctx;
+extern int          g_worker_processes_num;
+extern proc_ctx_t*  g_worker_processes;
+
 #endif // H_MAIN_H_

+ 11 - 3
hplatform.h

@@ -94,14 +94,22 @@
     #define strncasecmp strnicmp
     #define MKDIR(dir) mkdir(dir)
 #else
-    #include <unistd.h> // for daemon
+    #include <unistd.h>
+    #include <pthread.h>
     #include <dirent.h> // for mkdir,rmdir,chdir,getcwd
     #include <sys/time.h>  // for gettimeofday
 
-    #include <pthread.h>
+    // socket
+    #include <sys/types.h>
+    #include <sys/socket.h>
+    #include <arpa/inet.h>
+    #include <netinet/in.h>
+    #include <netinet/tcp.h>
+    #include <netinet/udp.h>
+    #include <fcntl.h>
+    #include <netdb.h> // for gethostbyname
 
     #include <strings.h>
-
     #define stricmp     strcasecmp
     #define strnicmp    strncasecmp
     #define MKDIR(dir) mkdir(dir, 0777)

+ 25 - 8
hproc.h

@@ -4,13 +4,15 @@
 #include "hplatform.h"
 #include "hdef.h"
 #include "hlog.h"
-#include "hmain.h"
 
 typedef struct proc_ctx_s {
     pid_t           pid; // tid in win32
-    char            proctitle[256];
+    procedure_t     init;
+    void*           init_userdata;
     procedure_t     proc;
-    void*           userdata;
+    void*           proc_userdata;
+    procedure_t     exit;
+    void*           exit_userdata;
 } proc_ctx_t;
 
 #ifdef OS_UNIX
@@ -23,11 +25,14 @@ inline int create_proc(proc_ctx_t* ctx) {
     } else if (pid == 0) {
         // child proc
         hlogi("proc start/running, pid=%d", getpid());
-        if (strlen(ctx->proctitle) != 0) {
-            setproctitle(ctx->proctitle);
+        if (ctx->init) {
+            ctx->init(ctx->init_userdata);
         }
         if (ctx->proc) {
-            ctx->proc(ctx->userdata);
+            ctx->proc(ctx->proc_userdata);
+        }
+        if (ctx->exit) {
+            ctx->exit(ctx->exit_userdata);
         }
         exit(0);
     } else if (pid > 0) {
@@ -38,15 +43,27 @@ inline int create_proc(proc_ctx_t* ctx) {
 }
 #elif defined(OS_WIN)
 // win32 use multi-threads
+static void win_thread(void* userdata) {
+    hlogi("proc start/running, tid=%d", GetCurrentThreadId());
+    proc_ctx_t* ctx = (proc_ctx_t*)userdata;
+    if (ctx->init) {
+        ctx->init(ctx->init_userdata);
+    }
+    if (ctx->proc) {
+        ctx->proc(ctx->proc_userdata);
+    }
+    if (ctx->exit) {
+        ctx->exit(ctx->exit_userdata);
+    }
+}
 inline int create_proc(proc_ctx_t* ctx) {
-    HANDLE h = (HANDLE)_beginthread(ctx->proc, 0, ctx->userdata);
+    HANDLE h = (HANDLE)_beginthread(win_thread, 0, ctx);
     if (h == NULL) {
         hloge("_beginthread error: %d", errno);
         return -1;
     }
     int tid = GetThreadId(h);
     ctx->pid = tid;
-    hlogi("proc start/running, tid=%d", tid);
     return tid;
 }
 #endif

+ 52 - 185
main.cpp.tmpl

@@ -1,8 +1,5 @@
 #include "h.h"
-
-#define DEFAULT_WORKER_PROCESSES    4
-#define MAXNUM_WORKER_PROCESSES     1024
-static proc_ctx_t s_worker_processes[MAXNUM_WORKER_PROCESSES];
+#include "hmain.h"
 
 typedef struct conf_ctx_s {
     IniParser* parser;
@@ -24,11 +21,8 @@ static void print_help();
 
 static int  parse_confile(const char* confile);
 
-static int  signal_init();
-static void signal_cleanup();
-static void handle_signal();
-
 static void master_proc(void* userdata);
+static void worker_init(void* userdata);
 static void worker_proc(void* userdata);
 
 // short options
@@ -64,7 +58,6 @@ void print_help() {
 }
 
 int parse_confile(const char* confile) {
-    conf_ctx_init(&g_conf_ctx);
     int ret = g_conf_ctx.parser->LoadFromFile(confile);
     if (ret != 0) {
         printf("Load confile [%s] failed: %d\n", confile, ret);
@@ -104,15 +97,17 @@ int parse_confile(const char* confile) {
 
     // worker_processes
     int worker_processes = 0;
-    worker_processes = atoi(g_conf_ctx.parser->GetValue("worker_processes").c_str());
-    if (worker_processes <= 0 || worker_processes > MAXNUM_WORKER_PROCESSES) {
-        worker_processes = get_ncpu();
-        hlogd("worker_processes=ncpu=%d", worker_processes);
-    }
-    if (worker_processes <= 0 || worker_processes > MAXNUM_WORKER_PROCESSES) {
-        worker_processes = DEFAULT_WORKER_PROCESSES;
+    str = g_conf_ctx.parser->GetValue("worker_processes");
+    if (str.size() != 0) {
+        if (strcmp(str.c_str(), "auto") == 0) {
+            worker_processes = get_ncpu();
+            hlogd("worker_processes=ncpu=%d", worker_processes);
+        }
+        else {
+            worker_processes = atoi(str.c_str());
+        }
     }
-    g_conf_ctx.worker_processes = worker_processes;
+    g_conf_ctx.worker_processes = LIMIT(0, worker_processes, MAXNUM_WORKER_PROCESSES);
 
     // port
     int port = 0;
@@ -132,164 +127,18 @@ int parse_confile(const char* confile) {
     return 0;
 }
 
+void worker_init(void* userdata) {
 #ifdef OS_UNIX
-// unix use signal
-// we use SIGTERM to quit process, SIGUSR1 to reload confile
-#define SIGNAL_TERMINATE    SIGTERM
-#define SIGNAL_RELOAD       SIGUSR1
-#include <sys/wait.h>
-
-void signal_handler(int signo) {
-    hlogi("pid=%d recv signo=%d", getpid(), signo);
-    switch (signo) {
-    case SIGINT:
-    case SIGNAL_TERMINATE:
-        hlogi("killall processes");
-        signal(SIGCHLD, SIG_IGN);
-        for (int i = 0; i < MAXNUM_WORKER_PROCESSES; ++i) {
-            if (s_worker_processes[i].pid <= 0) break;
-            kill(s_worker_processes[i].pid, SIGKILL);
-            s_worker_processes[i].pid = -1;
-        }
-        exit(0);
-        break;
-    case SIGNAL_RELOAD:
-        hlogi("reload confile [%s]", g_main_ctx.confile);
-        parse_confile(g_main_ctx.confile);
-        break;
-    case SIGCHLD:
-    {
-        pid_t pid = 0;
-        int status = 0;
-        while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
-            hlogw("proc stop/waiting, pid=%d status=%d", pid, status);
-            for (int i = 0; i < MAXNUM_WORKER_PROCESSES; ++i) {
-                if (s_worker_processes[i].pid == pid) {
-                    s_worker_processes[i].pid = -1;
-                    create_proc(&s_worker_processes[i]);
-                    break;
-                }
-            }
-        }
-    }
-        break;
-    default:
-        break;
-    }
-}
-
-int signal_init() {
-    signal(SIGINT, signal_handler);
-    signal(SIGCHLD, signal_handler);
-    signal(SIGNAL_TERMINATE, signal_handler);
+    char proctitle[256] = {0};
+    snprintf(proctitle, sizeof(proctitle), "%s: worker process", g_main_ctx.program_name);
+    setproctitle(proctitle);
     signal(SIGNAL_RELOAD, signal_handler);
-
-    atexit(signal_cleanup);
-    return 0;
-}
-
-void signal_cleanup() {
-}
-#elif defined(OS_WIN)
-// win32 use Event
-static HANDLE s_hEventTerm = NULL;
-static HANDLE s_hEventReload = NULL;
-
-#include <mmsystem.h>
-#ifdef _MSC_VER
-#pragma comment(lib, "winmm.lib")
 #endif
-void WINAPI on_timer(UINT uTimerID, UINT uMsg, DWORD_PTR dwUser, DWORD_PTR dw1, DWORD_PTR dw2) {
-    DWORD ret = WaitForSingleObject(s_hEventTerm, 0);
-    if (ret == WAIT_OBJECT_0) {
-        timeKillEvent(uTimerID);
-        hlogi("pid=%d recv event [TERM]", getpid());
-        exit(0);
-    }
-
-    ret = WaitForSingleObject(s_hEventReload, 0);
-    if (ret == WAIT_OBJECT_0) {
-        hlogi("pid=%d recv event [RELOAD]", getpid());
-        parse_confile(g_main_ctx.confile);
-    }
 }
 
-int signal_init() {
-    char eventname[MAX_PATH] = {0};
-    snprintf(eventname, sizeof(eventname), "%s_term_event", g_main_ctx.program_name);
-    s_hEventTerm = CreateEvent(NULL, FALSE, FALSE, eventname);
-    //s_hEventTerm = OpenEvent(EVENT_ALL_ACCESS, FALSE, eventname);
-    snprintf(eventname, sizeof(eventname), "%s_reload_event", g_main_ctx.program_name);
-    s_hEventReload = CreateEvent(NULL, FALSE, FALSE, eventname);
-
-    timeSetEvent(1000, 1000, on_timer, 0, TIME_PERIODIC);
-
-    atexit(signal_cleanup);
-    return 0;
-}
-
-void signal_cleanup() {
-    CloseHandle(s_hEventTerm);
-    s_hEventTerm = NULL;
-    CloseHandle(s_hEventReload);
-    s_hEventReload = NULL;
-}
-#endif
-
-void handle_signal() {
-    const char* signal = get_arg("s");
-    if (signal) {
-        if (strcmp(signal, "start") == 0) {
-            if (g_main_ctx.oldpid > 0) {
-                printf("%s is already running, pid=%d\n", g_main_ctx.program_name, g_main_ctx.oldpid);
-                exit(0);
-            }
-        } else if (strcmp(signal, "stop") == 0) {
-            if (g_main_ctx.oldpid > 0) {
-#ifdef OS_UNIX
-                kill(g_main_ctx.oldpid, SIGNAL_TERMINATE);
-#else
-                SetEvent(s_hEventTerm);
-#endif
-                printf("%s stop/waiting\n", g_main_ctx.program_name);
-            } else {
-                printf("%s is already stopped\n", g_main_ctx.program_name);
-            }
-            exit(0);
-        } else if (strcmp(signal, "restart") == 0) {
-            if (g_main_ctx.oldpid > 0) {
-#ifdef OS_UNIX
-                kill(g_main_ctx.oldpid, SIGNAL_TERMINATE);
-#else
-                SetEvent(s_hEventTerm);
-#endif
-                printf("%s stop/waiting\n", g_main_ctx.program_name);
-                msleep(1000);
-            }
-        } else if (strcmp(signal, "status") == 0) {
-            if (g_main_ctx.oldpid > 0) {
-                printf("%s start/running, pid=%d\n", g_main_ctx.program_name, g_main_ctx.oldpid);
-            } else {
-                printf("%s stop/waiting\n", g_main_ctx.program_name);
-            }
-            exit(0);
-        } else if (strcmp(signal, "reload") == 0) {
-            if (g_main_ctx.oldpid > 0) {
-                printf("reload confile [%s]\n", g_main_ctx.confile);
-#ifdef __unix__
-                kill(g_main_ctx.oldpid, SIGNAL_RELOAD);
-#else
-                SetEvent(s_hEventReload);
-#endif
-            }
-            sleep(1);
-            exit(0);
-        } else {
-            printf("Invalid signal: '%s'\n", signal);
-            exit(0);
-        }
-        printf("%s start/running\n", g_main_ctx.program_name);
-    }
+static void on_reload(void* userdata) {
+    hlogi("reload confile [%s]", g_main_ctx.confile);
+    parse_confile(g_main_ctx.confile);
 }
 
 int main(int argc, char** argv) {
@@ -349,6 +198,7 @@ int main(int argc, char** argv) {
     }
 
     // g_conf_ctx
+    conf_ctx_init(&g_conf_ctx);
     parse_confile(g_main_ctx.confile);
 
     // test
@@ -358,8 +208,11 @@ int main(int argc, char** argv) {
     }
 
     // signal
-    signal_init();
-    handle_signal();
+    signal_init(on_reload);
+    const char* signal = get_arg("s");
+    if (signal) {
+        handle_signal(signal);
+    }
 
 #ifdef OS_UNIX
     // daemon
@@ -373,26 +226,40 @@ int main(int argc, char** argv) {
         // parent process exit after daemon, so pid changed.
         g_main_ctx.pid = getpid();
     }
-    // proctitle
-    char proctitle[256] = {0};
-    snprintf(proctitle, sizeof(proctitle), "%s: master process", g_main_ctx.program_name);
-    setproctitle(proctitle);
 #endif
 
     // pidfile
     create_pidfile();
     hlogi("%s start/running, pid=%d", g_main_ctx.program_name, g_main_ctx.pid);
 
-    // master-worker proc
-    memset(s_worker_processes, 0, sizeof(s_worker_processes));
-    for (int i = 0; i < g_conf_ctx.worker_processes; ++i) {
-        proc_ctx_t* ctx = &s_worker_processes[i];
-        snprintf(ctx->proctitle, sizeof(ctx->proctitle), "%s: worker process", g_main_ctx.program_name);
-        ctx->proc = worker_proc;
-        ctx->userdata = NULL;
-        create_proc(ctx);
+    if (g_conf_ctx.worker_processes == 0) {
+        // single process
+        worker_proc(NULL);
+    }
+    else {
+        // master-workers processes
+        g_worker_processes_num = g_conf_ctx.worker_processes;
+        int bytes = sizeof(proc_ctx_t) * g_worker_processes_num;
+        g_worker_processes = (proc_ctx_t*)malloc(bytes);
+        if (g_worker_processes == NULL) {
+            return ERR_MALLOC;
+        }
+        memset(g_worker_processes, 0, bytes);
+        for (int i = 0; i < g_worker_processes_num; ++i) {
+            proc_ctx_t* ctx = &g_worker_processes[i];
+            ctx->init = worker_init;
+            ctx->init_userdata = NULL;
+            ctx->proc = worker_proc;
+            ctx->proc_userdata = NULL;
+            create_proc(ctx);
+        }
+#ifdef OS_UNIX
+        char proctitle[256] = {0};
+        snprintf(proctitle, sizeof(proctitle), "%s: master process", g_main_ctx.program_name);
+        setproctitle(proctitle);
+#endif
+        master_proc(NULL);
     }
-    master_proc(NULL);
 
     return 0;
 }