Răsfoiți Sursa

add hthreadpool.h

ithewei 7 ani în urmă
părinte
comite
f2caa9b437
7 a modificat fișierele cu 199 adăugiri și 17 ștergeri
  1. 4 4
      .gitignore
  2. 46 0
      Makefile
  3. 1 0
      h.h
  4. 10 6
      hlog.cpp
  5. 1 0
      hlog.h
  6. 9 7
      hthread.h
  7. 128 0
      hthreadpool.h

+ 4 - 4
.gitignore

@@ -4,13 +4,13 @@
 *.org
 *.old
 
-# output
-*.o
-bin
-
 # IDE
 .vs
 .vscode
 
+# output
+*.o
+bin
+
 # test
 test

+ 46 - 0
Makefile

@@ -0,0 +1,46 @@
+CC = gcc
+CXX = g++
+MKDIR = mkdir -p
+RM = rm -r
+
+CPPFLAGS += -g -O2
+CFLAGS +=
+CXXFLAGS += -std=c++11
+
+INCDIR = include
+SRCDIR = src
+LIBDIR = lib
+BINDIR = bin
+
+TARGET = test
+ifeq ($(OS),Windows_NT)
+TARGET := $(addsuffix .exe, $(TARGET))
+endif
+
+INCFLAGS += -I.
+INCFLAGS += $(addprefix -I, $(INCDIR))
+CPPFLAGS += $(INCFLAGS)
+
+LDFLAGS += $(addprefix -L, $(LIBDIR))
+LDFLAGS += -lpthread
+
+DIRS += . $(SRCDIR) test
+SRCS += $(foreach dir, $(DIRS), $(wildcard $(dir)/*.c $(dir)/*.cc $(dir)/*.cpp))
+#OBJS := $(patsubst %.cpp, %.o, $(SRCS))
+OBJS := $(addsuffix .o, $(basename $(SRCS)))
+
+$(info DIRS=$(DIRS))
+$(info SRCS=$(SRCS))
+$(info OBJS=$(OBJS))
+
+all: prepare $(TARGET)
+
+prepare:
+	$(MKDIR) $(BINDIR)
+
+$(TARGET): $(OBJS)
+	$(CXX) $(CXXFLAGS) $(CPPFLAGS) $^ -o $(BINDIR)/$@ $(LDFLAGS)
+
+clean:
+	$(RM) $(OBJS)
+	$(RM) $(BINDIR)

+ 1 - 0
h.h

@@ -18,6 +18,7 @@
 #ifdef __cplusplus
 #include "hstring.h"
 #include "hthread.h"
+#include "hthreadpool.h"
 #include "hmutex.h"
 #include "hscope.h"
 #include "singleton.h"

+ 10 - 6
hlog.cpp

@@ -11,6 +11,7 @@
 static FILE* s_logfp = NULL;
 static char s_logfile[256] = DEFAULT_LOG_FILE;
 static int s_loglevel = DEFAULT_LOG_LEVEL;
+static bool s_logcolor = false;
 static char s_logbuf[LOGBUF_SIZE];
 static std::mutex s_mutex;
 
@@ -33,6 +34,10 @@ void hlog_set_level(int level){
     s_loglevel = level;
 }
 
+void hlog_enable_color(bool enable){
+    s_logcolor = enable;
+}
+
 int hlog_printf(int level, const char* fmt, ...) {
     if (level < s_loglevel)
         return -10;
@@ -47,9 +52,8 @@ int hlog_printf(int level, const char* fmt, ...) {
     }
 #undef CASE_LOG
 
-#ifdef _WIN32
-    pcolor = "";
-#endif
+    if (!s_logcolor)
+        pcolor = "";
 
     std::lock_guard<std::mutex> locker(s_mutex);
 
@@ -75,9 +79,9 @@ int hlog_printf(int level, const char* fmt, ...) {
     va_end(ap);
 
     fprintf(s_logfp, "%s\n", s_logbuf);
-#ifndef _WIN32
-    fprintf(s_logfp, CL_CLR);
-#endif
+    if (s_logcolor) {
+        fprintf(s_logfp, CL_CLR);
+    }
 
     fflush(NULL);
 

+ 1 - 0
hlog.h

@@ -40,6 +40,7 @@ enum LOG_LEVEL{
 
 int     hlog_set_file(const char* file);
 void    hlog_set_level(int level);
+void    hlog_enable_color(bool enable);
 int     hlog_printf(int level, const char* fmt, ...);
 
 #define hlogd(fmt, ...) hlog_printf(LOG_LEVEL_DEBUG, fmt " [%s:%d:%s]", ## __VA_ARGS__, __FILE__, __LINE__, __FUNCTION__)

+ 9 - 7
hthread.h

@@ -40,7 +40,13 @@ public:
     virtual int start() {
         if (status == STOP) {
             status = RUNNING;
-            thread = std::thread(&HThread::thread_proc, this);
+            dotask_cnt = 0;
+
+            thread = std::thread([this]{
+                doPrepare();
+                run();
+                doFinish();
+            });
         }
         return 0;
     }
@@ -67,12 +73,6 @@ public:
         return 0;
     }
 
-    void thread_proc() {
-        doPrepare();
-        run();
-        doFinish();
-    }
-
     virtual void run() {
         while (status != STOP) {
             while (status == PAUSE) {
@@ -80,6 +80,7 @@ public:
             }
 
             doTask();
+            dotask_cnt++;
 
             std::this_thread::yield();
         }
@@ -96,6 +97,7 @@ public:
         PAUSE,
     };
     std::atomic<Status> status;
+    uint32 dotask_cnt;
 };
 
 #endif // H_THREAD_H

+ 128 - 0
hthreadpool.h

@@ -0,0 +1,128 @@
+#ifndef H_THREAD_POOL_H
+#define H_THREAD_POOL_H
+
+#include <vector>
+#include <thread>
+#include <queue>
+#include <functional>
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+#include <future>
+
+#include "hlog.h"
+#include "hthread.h"
+
+class HThreadPool{
+public:
+    using Task = std::function<void()>;
+
+    HThreadPool(int size = std::thread::hardware_concurrency()) : pool_size(size), idle_num(size), status(STOP){
+
+    }
+
+    ~HThreadPool(){
+        stop();
+    }
+
+    int start() {
+        if (status == STOP) {
+            status = RUNNING;
+            for (int i = 0; i < pool_size; ++i){
+                workers.emplace_back(std::thread([this]{
+                    hlogd("work thread[%X] running...", gettid());
+                    while (status != STOP){
+                        while (status == PAUSE){
+                            std::this_thread::yield();
+                        }
+
+                        Task task;
+                        {
+                            std::unique_lock<std::mutex> locker(mutex);
+                            cond.wait(locker, [this]{
+                                return status == STOP || !tasks.empty();
+                            });
+
+                            if (status == STOP) return;
+                            
+                            if (!tasks.empty()){
+                                task = std::move(tasks.front());
+                                tasks.pop();
+                            }
+                        }
+
+                        --idle_num;
+                        task();
+                        ++idle_num;
+                    }    
+                }));
+            }
+        }
+        return 0;
+    }
+
+    int stop() {
+        if (status != STOP) {
+            status = STOP;
+            cond.notify_all();
+            for (auto& thread: workers){
+                thread.join();
+            }
+        }
+        return 0;
+    }
+
+    int pause() {
+        if (status == RUNNING) {
+            status = PAUSE;
+        }
+        return 0;
+    }
+
+    int resume() {
+        if (status == PAUSE) {
+            status = RUNNING;
+        }
+        return 0;
+    }
+
+    // return a future, calling future.get() will wait task done and return RetType.
+    // commit(fn, args...)
+    // commit(std::bind(&Class::mem_fn, &obj))
+    // commit(std::mem_fn(&Class::mem_fn, &obj))
+    template<class Fn, class... Args>
+    auto commit(Fn&& fn, Args&&... args) -> std::future<decltype(fn(args...))>{
+        using RetType = decltype(fn(args...));
+        auto task = std::make_shared<std::packaged_task<RetType()> >(
+            std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...)
+        );
+        std::future<RetType> future = task->get_future();
+        {
+            std::lock_guard<std::mutex> locker(mutex);
+            tasks.emplace([task]{
+                (*task)();
+            });
+        }
+        
+        cond.notify_one();
+        return future;
+    }
+
+public:
+    int pool_size;
+    std::atomic<int> idle_num;
+
+    enum Status {
+        STOP,
+        RUNNING,
+        PAUSE,
+    };
+    std::atomic<Status> status;
+    std::vector<std::thread> workers;
+
+    std::queue<Task> tasks;
+    std::mutex        mutex;
+    std::condition_variable cond;
+};
+
+#endif // H_THREAD_POOL_H