소스 검색

Impl hv::async

ithewei 4 년 전
부모
커밋
8cb296f94f
9개의 변경된 파일228개의 추가작업 그리고 64개의 파일을 삭제
  1. 1 0
      Makefile.vars
  2. 1 0
      cmake/vars.cmake
  3. 1 0
      cpputil/README.md
  4. 7 0
      cpputil/hasync.cpp
  5. 48 0
      cpputil/hasync.h
  6. 162 57
      cpputil/hthreadpool.h
  7. 3 4
      examples/httpd/router.cpp
  8. 4 2
      http/HttpMessage.cpp
  9. 1 1
      unittest/threadpool_test.cpp

+ 1 - 0
Makefile.vars

@@ -43,6 +43,7 @@ CPPUTIL_HEADERS = cpputil/hmap.h\
 				cpputil/hurl.h\
 				cpputil/hscope.h\
 				cpputil/hthreadpool.h\
+				cpputil/hasync.h\
 				cpputil/hobjectpool.h\
 				cpputil/ifconfig.h\
 				cpputil/iniparser.h\

+ 1 - 0
cmake/vars.cmake

@@ -44,6 +44,7 @@ set(CPPUTIL_HEADERS
     cpputil/hurl.h
     cpputil/hscope.h
     cpputil/hthreadpool.h
+    cpputil/hasync.h
     cpputil/hobjectpool.h
     cpputil/ifconfig.h
     cpputil/iniparser.h

+ 1 - 0
cpputil/README.md

@@ -2,6 +2,7 @@
 
 ```
 .
+├── hasync.h        hv::async实现
 ├── hdir.h          目录(ls实现)
 ├── hfile.h         文件类
 ├── hobjectpool.h   对象池

+ 7 - 0
cpputil/hasync.cpp

@@ -0,0 +1,7 @@
+#include "hasync.h"
+
+namespace hv {
+
+SINGLETON_IMPL(GlobalThreadPool)
+
+}

+ 48 - 0
cpputil/hasync.h

@@ -0,0 +1,48 @@
+#ifndef HV_ASYNC_H_
+#define HV_ASYNC_H_
+
+#include "hexport.h"
+#include "hthreadpool.h"
+#include "singleton.h"
+
+namespace hv {
+
+class HV_EXPORT GlobalThreadPool : public HThreadPool {
+    SINGLETON_DECL(GlobalThreadPool)
+protected:
+    GlobalThreadPool() : HThreadPool() {}
+};
+
+/*
+ * return a future, calling future.get() will wait task done and return RetType.
+ * async(fn, args...)
+ * async(std::bind(&Class::mem_fn, &obj))
+ * async(std::mem_fn(&Class::mem_fn, &obj))
+ *
+ */
+template<class Fn, class... Args>
+auto async(Fn&& fn, Args&&... args) -> std::future<decltype(fn(args...))> {
+    return GlobalThreadPool::instance()->commit(std::forward<Fn>(fn), std::forward<Args>(args)...);
+}
+
+class async {
+public:
+    static void startup(int min_threads = DEFAULT_THREAD_POOL_MIN_THREAD_NUM,
+                 int max_threads = DEFAULT_THREAD_POOL_MAX_THREAD_NUM,
+                 int max_idle_ms = DEFAULT_THREAD_POOL_MAX_IDLE_TIME) {
+        GlobalThreadPool* gtp = GlobalThreadPool::instance();
+        if (gtp->isStarted()) return;
+        gtp->setMinThreadNum(min_threads);
+        gtp->setMaxThreadNum(max_threads);
+        gtp->setMaxIdleTime(max_idle_ms);
+        gtp->start();
+    }
+
+    static void cleanup() {
+        GlobalThreadPool::exitInstance();
+    }
+};
+
+} // end namespace hv
+
+#endif // HV_ASYNC_H_

+ 162 - 57
cpputil/hthreadpool.h

@@ -1,8 +1,9 @@
 #ifndef HV_THREAD_POOL_H_
 #define HV_THREAD_POOL_H_
 
-#include <vector>
+#include <time.h>
 #include <thread>
+#include <list>
 #include <queue>
 #include <functional>
 #include <atomic>
@@ -11,62 +12,76 @@
 #include <future>
 #include <memory>
 #include <utility>
+#include <chrono>
+
+#define DEFAULT_THREAD_POOL_MIN_THREAD_NUM  1
+#define DEFAULT_THREAD_POOL_MAX_THREAD_NUM  std::thread::hardware_concurrency()
+#define DEFAULT_THREAD_POOL_MAX_IDLE_TIME   60000 // ms
 
 class HThreadPool {
 public:
     using Task = std::function<void()>;
 
-    HThreadPool(int size = std::thread::hardware_concurrency())
-        : pool_size(size), idle_num(size), status(STOP) {
-    }
-
-    ~HThreadPool() {
+    HThreadPool(int min_threads = DEFAULT_THREAD_POOL_MIN_THREAD_NUM,
+                int max_threads = DEFAULT_THREAD_POOL_MAX_THREAD_NUM,
+                int max_idle_ms = DEFAULT_THREAD_POOL_MAX_IDLE_TIME)
+        : min_thread_num(min_threads)
+        , max_thread_num(max_threads)
+        , max_idle_time(max_idle_ms)
+        , status(STOP)
+        , cur_thread_num(0)
+        , idle_thread_num(0)
+    {}
+
+    virtual ~HThreadPool() {
         stop();
     }
 
-    int start() {
-        if (status == STOP) {
-            status = RUNNING;
-            for (int i = 0; i < pool_size; ++i) {
-                workers.emplace_back(std::thread([this]{
-                    while (status != STOP) {
-                        while (status == PAUSE) {
-                            std::this_thread::yield();
-                        }
-
-                        Task task;
-                        {
-                            std::unique_lock<std::mutex> locker(_mutex);
-                            _cond.wait(locker, [this]{
-                                return status == STOP || !tasks.empty();
-                            });
-
-                            if (status == STOP) return;
-
-                            if (!tasks.empty()) {
-                                --idle_num;
-                                task = std::move(tasks.front());
-                                tasks.pop();
-                            }
-                        }
+    void setMinThreadNum(int min_threads) {
+        min_thread_num = min_threads;
+    }
+    void setMaxThreadNum(int max_threads) {
+        max_thread_num = max_threads;
+    }
+    void setMaxIdleTime(int ms) {
+        max_idle_time = ms;
+    }
+    int currentThreadNum() {
+        return cur_thread_num;
+    }
+    int idleThreadNum() {
+        return idle_thread_num;
+    }
+    bool isStarted() {
+        return status != STOP;
+    }
+    bool isStopped() {
+        return status == STOP;
+    }
 
-                        task();
-                        ++idle_num;
-                    }
-                }));
-            }
+    int start(int start_threads = 0) {
+        if (status != STOP) return -1;
+        status = RUNNING;
+        if (start_threads < min_thread_num) start_threads = min_thread_num;
+        if (start_threads > max_thread_num) start_threads = max_thread_num;
+        for (int i = 0; i < start_threads; ++i) {
+            createThread();
         }
         return 0;
     }
 
     int stop() {
-        if (status != STOP) {
-            status = STOP;
-            _cond.notify_all();
-            for (auto& worker : workers) {
-                worker.join();
+        if (status == STOP) return -1;
+        status = STOP;
+        task_cond.notify_all();
+        for (auto& i : threads) {
+            if (i.thread->joinable()) {
+                i.thread->join();
             }
         }
+        threads.clear();
+        cur_thread_num = 0;
+        idle_thread_num = 0;
         return 0;
     }
 
@@ -86,7 +101,7 @@ public:
 
     int wait() {
         while (1) {
-            if (status == STOP || (tasks.empty() && idle_num == pool_size)) {
+            if (status == STOP || (tasks.empty() && idle_thread_num == cur_thread_num)) {
                 break;
             }
             std::this_thread::yield();
@@ -94,42 +109,132 @@ public:
         return 0;
     }
 
-    // return a future, calling future.get() will wait task done and return RetType.
-    // commit(fn, args...)
-    // commit(std::bind(&Class::mem_fn, &obj))
-    // commit(std::mem_fn(&Class::mem_fn, &obj))
+    /*
+     * return a future, calling future.get() will wait task done and return RetType.
+     * commit(fn, args...)
+     * commit(std::bind(&Class::mem_fn, &obj))
+     * commit(std::mem_fn(&Class::mem_fn, &obj))
+     *
+     */
     template<class Fn, class... Args>
     auto commit(Fn&& fn, Args&&... args) -> std::future<decltype(fn(args...))> {
+        if (status == STOP) start();
+        if (idle_thread_num == 0 && cur_thread_num < max_thread_num) {
+            createThread();
+        }
         using RetType = decltype(fn(args...));
         auto task = std::make_shared<std::packaged_task<RetType()> >(
             std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...));
         std::future<RetType> future = task->get_future();
         {
-            std::lock_guard<std::mutex> locker(_mutex);
+            std::lock_guard<std::mutex> locker(task_mutex);
             tasks.emplace([task]{
                 (*task)();
             });
         }
 
-        _cond.notify_one();
+        task_cond.notify_one();
         return future;
     }
 
+protected:
+    bool createThread() {
+        if (cur_thread_num >= max_thread_num) return false;
+        std::thread* thread = new std::thread([this] {
+            while (status != STOP) {
+                while (status == PAUSE) {
+                    std::this_thread::yield();
+                }
+
+                Task task;
+                {
+                    std::unique_lock<std::mutex> locker(task_mutex);
+                    task_cond.wait_for(locker, std::chrono::milliseconds(max_idle_time), [this]() {
+                        return status == STOP || !tasks.empty();
+                    });
+                    if (status == STOP) return;
+                    if (tasks.empty()) {
+                        if (cur_thread_num > min_thread_num) {
+                            delThread(std::this_thread::get_id());
+                            return;
+                        }
+                        continue;
+                    }
+                    --idle_thread_num;
+                    task = std::move(tasks.front());
+                    tasks.pop();
+                }
+                if (task) {
+                    task();
+                    ++idle_thread_num;
+                }
+            }
+        });
+        addThread(thread);
+        return true;
+    }
+
+    void addThread(std::thread* thread) {
+        thread_mutex.lock();
+        ++cur_thread_num;
+        ++idle_thread_num;
+        ThreadData data;
+        data.thread = std::shared_ptr<std::thread>(thread);
+        data.id = thread->get_id();
+        data.status = RUNNING;
+        data.start_time = time(NULL);
+        threads.emplace_back(data);
+        thread_mutex.unlock();
+    }
+
+    void delThread(std::thread::id id) {
+        time_t now = time(NULL);
+        thread_mutex.lock();
+        --cur_thread_num;
+        --idle_thread_num;
+        auto iter = threads.begin();
+        while (iter != threads.end()) {
+            if (iter->status == STOP && now > iter->stop_time) {
+                if (iter->thread->joinable()) {
+                    iter->thread->join();
+                    iter = threads.erase(iter);
+                    continue;
+                }
+            } else if (iter->id == id) {
+                iter->status = STOP;
+                iter->stop_time = time(NULL);
+            }
+            ++iter;
+        }
+        thread_mutex.unlock();
+    }
+
 public:
+    int min_thread_num;
+    int max_thread_num;
+    int max_idle_time;
+
+protected:
     enum Status {
         STOP,
         RUNNING,
         PAUSE,
     };
-    int                 pool_size;
-    std::atomic<int>    idle_num;
-    std::atomic<Status> status;
-    std::vector<std::thread>    workers;
-    std::queue<Task>            tasks;
-
-protected:
-    std::mutex              _mutex;
-    std::condition_variable _cond;
+    struct ThreadData {
+        std::shared_ptr<std::thread> thread;
+        std::thread::id id;
+        Status          status;
+        time_t          start_time;
+        time_t          stop_time;
+    };
+    std::atomic<Status>     status;
+    std::atomic<int>        cur_thread_num;
+    std::atomic<int>        idle_thread_num;
+    std::list<ThreadData>   threads;
+    std::mutex              thread_mutex;
+    std::queue<Task>        tasks;
+    std::mutex              task_mutex;
+    std::condition_variable task_cond;
 };
 
 #endif // HV_THREAD_POOL_H_

+ 3 - 4
examples/httpd/router.cpp

@@ -1,10 +1,9 @@
 #include "router.h"
 
-#include <future> // import std::async
-
 #include "handler.h"
 #include "hthread.h"
-#include "requests.h"
+#include "hasync.h"     // import hv::async
+#include "requests.h"   // import requests::async
 
 void Router::Register(hv::HttpService& router) {
     // preprocessor => Handler => postprocessor
@@ -69,7 +68,7 @@ void Router::Register(hv::HttpService& router) {
     // curl -v http://ip:port/async
     router.GET("/async", [](const HttpRequestPtr& req, const HttpResponseWriterPtr& writer) {
         writer->WriteHeader("X-Request-tid", hv_gettid());
-        std::async([req, writer](){
+        hv::async([req, writer](){
             writer->WriteHeader("X-Response-tid", hv_gettid());
             writer->WriteHeader("Content-Type", "text/plain");
             writer->WriteBody("This is an async response.\n");

+ 4 - 2
http/HttpMessage.cpp

@@ -303,8 +303,10 @@ void HttpMessage::FillContentLength() {
         DumpBody();
         content_length = body.size();
     }
-    if (iter == headers.end() && content_length != 0 && !IsChunked()) {
-        headers["Content-Length"] = hv::to_string(content_length);
+    if (iter == headers.end() && !IsChunked()) {
+        if (content_length != 0 || type == HTTP_RESPONSE) {
+            headers["Content-Length"] = hv::to_string(content_length);
+        }
     }
 }
 

+ 1 - 1
unittest/threadpool_test.cpp

@@ -10,7 +10,7 @@ void print_task(int i) {
 }
 
 int main(int argc, char** argv) {
-    HThreadPool tp(4);
+    HThreadPool tp(1, 4);
     tp.start();
 
     int i = 0;