فهرست منبع

Impl #172: defaultLargeFileHander and limit_rate

ithewei 3 سال پیش
والد
کامیت
47493a6eed

+ 1 - 0
etc/httpd.conf

@@ -26,6 +26,7 @@ document_root = html
 home_page = index.html
 #error_page = error.html
 index_of = /downloads/
+limit_rate = 500 # KB/s
 
 # SSL/TLS
 ssl_certificate = cert/server.crt

+ 15 - 11
examples/httpd/handler.cpp

@@ -89,16 +89,28 @@ int Handler::largeFileHandler(const HttpContextPtr& ctx) {
         ctx->writer->EndHeaders();
 
         char* buf = NULL;
-        int len = 4096; // 4K
+        int len = 40960; // 40K
         SAFE_ALLOC(buf, len);
         size_t total_readbytes = 0;
         int last_progress = 0;
-        int sendbytes_per_ms = 1024; // 1KB/ms = 1MB/s = 8Mbps
-        int sleep_ms_per_send = len / sendbytes_per_ms; // 4ms
+        int sleep_ms_per_send = 0;
+        if (ctx->service->limit_rate <= 0) {
+            // unlimited
+        } else {
+            sleep_ms_per_send = len * 1000 / 1024 / ctx->service->limit_rate;
+        }
+        if (sleep_ms_per_send == 0) sleep_ms_per_send = 1;
         int sleep_ms = sleep_ms_per_send;
         auto start_time = std::chrono::steady_clock::now();
         auto end_time = start_time;
         while (total_readbytes < filesize) {
+            if (!ctx->writer->isConnected()) {
+                break;
+            }
+            if (!ctx->writer->isWriteComplete()) {
+                hv_delay(1);
+                continue;
+            }
             size_t readbytes = file.read(buf, len);
             if (readbytes <= 0) {
                 // read file error!
@@ -109,14 +121,6 @@ int Handler::largeFileHandler(const HttpContextPtr& ctx) {
             if (nwrite < 0) {
                 // disconnected!
                 break;
-            } else if (nwrite == 0) {
-                // send too fast or peer recv too slow
-                // reduce speed of send
-                sleep_ms *= 2;
-                // size_t write_backlog = hio_write_bufsize(ctx->writer->io());
-            } else {
-                // restore speed of send
-                sleep_ms = sleep_ms_per_send;
             }
             total_readbytes += readbytes;
             int cur_progress = total_readbytes * 100 / filesize;

+ 5 - 0
examples/httpd/httpd.cpp

@@ -163,6 +163,11 @@ int parse_confile(const char* confile) {
     if (str.size() != 0) {
         g_http_service.index_of = str;
     }
+    // limit_rate
+    str = ini.GetValue("limit_rate");
+    if (str.size() != 0) {
+        g_http_service.limit_rate = atoi(str.c_str());
+    }
     // ssl
     if (g_http_server.https_port > 0) {
         std::string crt_file = ini.GetValue("ssl_certificate");

+ 1 - 1
examples/httpd/router.cpp

@@ -9,7 +9,7 @@ void Router::Register(hv::HttpService& router) {
     // preprocessor => Handler => postprocessor
     router.preprocessor = Handler::preprocessor;
     router.postprocessor = Handler::postprocessor;
-    router.largeFileHandler = Handler::largeFileHandler;
+    // router.largeFileHandler = Handler::largeFileHandler;
     // router.errorHandler = Handler::errorHandler;
 
     // curl -v http://ip:port/ping

+ 26 - 15
examples/wget.cpp

@@ -10,7 +10,7 @@ using namespace hv;
 
 typedef std::function<void(size_t received_bytes, size_t total_bytes)> wget_progress_cb;
 
-static int wget(const char* url, const char* filepath, wget_progress_cb progress_cb = NULL) {
+static int wget(const char* url, const char* filepath, wget_progress_cb progress_cb = NULL, bool use_range = true) {
     int ret = 0;
     HttpClient cli;
     HttpRequest req;
@@ -31,16 +31,18 @@ static int wget(const char* url, const char* filepath, wget_progress_cb progress
     }
 
     // use Range?
-    bool use_range = false;
     int range_bytes = 1 << 20; // 1M
     long from = 0, to = 0;
-    std::string accept_ranges = resp.GetHeader("Accept-Ranges");
     size_t content_length = hv::from_string<size_t>(resp.GetHeader("Content-Length"));
-    // use Range if server accept_ranges and content_length > 1M
-    if (resp.status_code == 200 &&
-        accept_ranges == "bytes" &&
-        content_length > range_bytes) {
-        use_range = true;
+    if (use_range) {
+        use_range = false;
+        std::string accept_ranges = resp.GetHeader("Accept-Ranges");
+        // use Range if server accept_ranges and content_length > 1M
+        if (resp.status_code == 200 &&
+            accept_ranges == "bytes" &&
+            content_length > range_bytes) {
+            use_range = true;
+        }
     }
 
     // open file
@@ -104,19 +106,25 @@ static int wget(const char* url, const char* filepath, wget_progress_cb progress
     return 0;
 error:
     file.close();
-    remove(filepath);
+    // remove(filepath);
     return ret;
 }
 
 int main(int argc, char** argv) {
     if (argc < 2) {
-        printf("Usage: %s url [filepath]\n", argv[0]);
+        printf("Usage: %s [--use_range] url [filepath]\n", argv[0]);
         return -10;
     }
-    const char* url = argv[1];
+    int idx = 1;
+    bool use_range = false;
+    if (strcmp(argv[idx], "--use_range") == 0) {
+        use_range = true;
+        ++idx;
+    }
+    const char* url = argv[idx++];
     const char* filepath = "index.html";
-    if (argc > 2) {
-        filepath = argv[2];
+    if (argv[idx]) {
+        filepath = argv[idx];
     } else {
         const char* path = strrchr(url, '/');
         if (path && path[1]) {
@@ -138,9 +146,12 @@ int main(int argc, char** argv) {
             }
         }
         fflush(stdout);
-    });
+    }, use_range);
     unsigned int end_time = gettick_ms();
-    printf("\ncost time %u ms\n", end_time - start_time);
+    unsigned int cost_time = end_time - start_time;
+    printf("\ncost time %u ms\n", cost_time);
+    // 1B/ms = 1KB/s = 8Kbps
+    printf("download rate = %lu KB/s\n", (unsigned long)hv_filesize(filepath) / cost_time);
 
     return 0;
 }

+ 14 - 12
http/HttpMessage.h

@@ -111,7 +111,7 @@ public:
 
     // structured content
     void*               content;    // DATA_NO_COPY
-    int                 content_length;
+    size_t              content_length;
     http_content_type   content_type;
 #ifndef WITHOUT_HTTP_CONTENT
     hv::Json            json;       // APPLICATION_JSON
@@ -316,7 +316,7 @@ public:
         return content;
     }
 
-    int ContentLength() {
+    size_t ContentLength() {
         if (content_length == 0) {
             FillContentLength();
         }
@@ -329,6 +329,15 @@ public:
         }
         return content_type;
     }
+    void SetContentTypeByFilename(const char* filepath) {
+        const char* suffix = hv_suffixname(filepath);
+        if (suffix) {
+            content_type = http_content_type_enum_by_suffix(suffix);
+        }
+        if (content_type == CONTENT_TYPE_NONE || content_type == CONTENT_TYPE_UNDEFINED) {
+            content_type = APPLICATION_OCTET_STREAM;
+        }
+    }
 
     void AddCookie(const HttpCookie& cookie) {
         cookies.push_back(cookie);
@@ -354,20 +363,13 @@ public:
         return 200;
     }
 
-    int File(const char* filepath, bool read = true) {
+    int File(const char* filepath) {
         HFile file;
         if (file.open(filepath, "rb") != 0) {
             return HTTP_STATUS_NOT_FOUND;
         }
-        const char* suffix = hv_suffixname(filepath);
-        if (suffix) {
-            content_type = http_content_type_enum_by_suffix(suffix);
-        }
-        if (content_type == CONTENT_TYPE_NONE || content_type == CONTENT_TYPE_UNDEFINED) {
-            content_type = APPLICATION_OCTET_STREAM;
-        }
-        if (read)
-            file.readall(body);
+        SetContentTypeByFilename(filepath);
+        file.readall(body);
         return 200;
     }
 

+ 1 - 1
http/httpdef.h

@@ -169,7 +169,7 @@ enum http_method {
     XX(IMAGE_BMP,               image/bmp,                bmp)          \
     XX(IMAGE_SVG,               image/svg,                svg)          \
     XX(VIDEO_AVI,               video/x-msvideo,          avi)          \
-    XX(VIDEO_TS,                video/mp2t,               ts)          \
+    XX(VIDEO_TS,                video/mp2t,               ts)           \
     XX(VIDEO_WEBM,              video/webm,               webm)         \
     XX(VIDEO_FLV,               video/x-flv,              flv)          \
     XX(VIDEO_MP4,               video/mp4,                mp4)          \

+ 2 - 2
http/server/FileCache.cpp

@@ -16,7 +16,7 @@ file_cache_ptr FileCache::Open(const char* filepath, OpenParam* param) {
     bool modified = false;
     if (fc) {
         time_t now = time(NULL);
-        if (now - fc->stat_time > file_stat_interval) {
+        if (now - fc->stat_time > stat_interval) {
             modified = fc->is_modified();
             fc->stat_time = now;
             fc->stat_cnt++;
@@ -148,7 +148,7 @@ void FileCache::RemoveExpiredFileCache() {
     time_t now = time(NULL);
     auto iter = cached_files.begin();
     while (iter != cached_files.end()) {
-        if (now - iter->second->stat_time > file_expired_time) {
+        if (now - iter->second->stat_time > expired_time) {
             iter = cached_files.erase(iter);
         } else {
             ++iter;

+ 4 - 6
http/server/FileCache.h

@@ -57,18 +57,16 @@ typedef std::shared_ptr<file_cache_t>           file_cache_ptr;
 // filepath => file_cache_ptr
 typedef std::map<std::string, file_cache_ptr>   FileCacheMap;
 
-#define DEFAULT_FILE_STAT_INTERVAL      10 // s
-#define DEFAULT_FILE_EXPIRED_TIME       60 // s
 class FileCache {
 public:
-    int file_stat_interval;
-    int file_expired_time;
     FileCacheMap    cached_files;
     std::mutex      mutex_;
+    int             stat_interval;
+    int             expired_time;
 
     FileCache() {
-        file_stat_interval = DEFAULT_FILE_STAT_INTERVAL;
-        file_expired_time  = DEFAULT_FILE_EXPIRED_TIME;
+        stat_interval = 10; // s
+        expired_time  = 60; // s
     }
 
     struct OpenParam {

+ 192 - 135
http/server/HttpHandler.cpp

@@ -3,16 +3,85 @@
 #include "hbase.h"
 #include "herr.h"
 #include "hlog.h"
+#include "htime.h"
 #include "hasync.h" // import hv::async for http_async_handler
 #include "http_page.h"
 
-#include "EventLoop.h"
-#include "htime.h"
-bool HttpHandler::SwitchWebSocket(hio_t* io, ws_session_type type) {
-    if(!io || !ws_service) return false;
+#include "EventLoop.h" // import hv::setInterval
+using namespace hv;
+
+HttpHandler::HttpHandler() {
+    protocol = UNKNOWN;
+    state = WANT_RECV;
+    ssl = false;
+    service = NULL;
+    ws_service = NULL;
+    last_send_ping_time = 0;
+    last_recv_pong_time = 0;
+
+    files = NULL;
+    file = NULL;
+}
+
+HttpHandler::~HttpHandler() {
+    closeFile();
+    if (writer) {
+        writer->status = hv::SocketChannel::DISCONNECTED;
+    }
+}
+
+bool HttpHandler::Init(int http_version, hio_t* io) {
+    parser.reset(HttpParser::New(HTTP_SERVER, (enum http_version)http_version));
+    if (parser == NULL) {
+        return false;
+    }
+    req.reset(new HttpRequest);
+    resp.reset(new HttpResponse);
+    if(http_version == 1) {
+        protocol = HTTP_V1;
+    } else if (http_version == 2) {
+        protocol = HTTP_V2;
+        resp->http_major = req->http_major = 2;
+        resp->http_minor = req->http_minor = 0;
+    }
+    parser->InitRequest(req.get());
+    if (io) {
+        writer.reset(new hv::HttpResponseWriter(io, resp));
+        writer->status = hv::SocketChannel::CONNECTED;
+    }
+    return true;
+}
+
+void HttpHandler::Reset() {
+    state = WANT_RECV;
+    req->Reset();
+    resp->Reset();
+    parser->InitRequest(req.get());
+    closeFile();
+    if (writer) {
+        writer->Begin();
+    }
+}
+
+bool HttpHandler::SwitchHTTP2() {
+    parser.reset(HttpParser::New(HTTP_SERVER, ::HTTP_V2));
+    if (parser == NULL) {
+        return false;
+    }
+    protocol = HTTP_V2;
+    resp->http_major = req->http_major = 2;
+    resp->http_minor = req->http_minor = 0;
+    parser->InitRequest(req.get());
+    return true;
+}
+
+bool HttpHandler::SwitchWebSocket(hio_t* io) {
+    if (!io && writer) io = writer->io();
+    if(!io) return false;
+
     protocol = WEBSOCKET;
     ws_parser.reset(new WebSocketParser);
-    ws_channel.reset(new hv::WebSocketChannel(io, type));
+    ws_channel.reset(new hv::WebSocketChannel(io, WS_SERVER));
     ws_parser->onMessage = [this](int opcode, const std::string& msg){
         switch(opcode) {
         case WS_OPCODE_CLOSE:
@@ -39,7 +108,7 @@ bool HttpHandler::SwitchWebSocket(hio_t* io, ws_session_type type) {
         }
     };
     // NOTE: cancel keepalive timer, judge alive by heartbeat.
-    hio_set_keepalive_timeout(io, 0);
+    ws_channel->setKeepaliveTimeout(0);
     if (ws_service && ws_service->ping_interval > 0) {
         int ping_interval = MAX(ws_service->ping_interval, 1000);
         ws_channel->setHeartbeat(ping_interval, [this](){
@@ -53,43 +122,9 @@ bool HttpHandler::SwitchWebSocket(hio_t* io, ws_session_type type) {
             }
         });
     }
-    // onopen
-    WebSocketOnOpen();
     return true;
 }
 
-HttpHandler::HttpHandler()
-{
-    protocol = UNKNOWN;
-    state = WANT_RECV;
-    ssl = false;
-    service = NULL;
-    files = NULL;
-    ws_service = NULL;
-    last_send_ping_time = 0;
-    last_recv_pong_time = 0;
-
-    flushing_ = false;
-    last_flush_size = 0;
-    last_flush_time = 0;
-    flush_timer = 0;
-}
-
-HttpHandler::~HttpHandler() {
-    if (writer) {
-        writer->status = hv::SocketChannel::DISCONNECTED;
-    }
-    resetFlush();
-}
-
-void HttpHandler::resetFlush(){
-    file.close();
-    if(flush_timer){
-        hv::killTimer(flush_timer);
-        flush_timer = 0;
-    }
-}
-
 int HttpHandler::customHttpHandler(const http_handler& handler) {
     return invokeHttpHandler(&handler);
 }
@@ -222,70 +257,65 @@ int HttpHandler::defaultStaticHandler() {
     if (req_path[1] == '\0') {
         filepath += service->home_page;
     }
+
+    // dir
     bool is_dir = filepath[filepath.size()-1] == '/';
-    bool is_index_of = false;
-    if (service->index_of.size() != 0 && hv_strstartswith(req_path, service->index_of.c_str())) {
-        is_index_of = true;
-    }
-    if (is_dir && !is_index_of) { // unsupport dir without index
+    bool is_index_of = service->index_of.size() != 0 && hv_strstartswith(req_path, service->index_of.c_str());
+    if (is_dir && !is_index_of) {
         return HTTP_STATUS_NOT_FOUND;
     }
 
     int status_code = HTTP_STATUS_OK;
+    // Range:
     bool has_range = false;
-    FileCache::OpenParam param;
     long from, to = 0;
-    // Range:
     if (req->GetRange(from, to)) {
         has_range = true;
-        if (file.open(filepath.c_str(), "rb") != 0) {
+        if (openFile(filepath.c_str()) != 0) {
             return HTTP_STATUS_NOT_FOUND;
         }
-        long total = file.size();
+        long total = file->size();
         if (to == 0 || to >= total) to = total - 1;
-        file.seek(from);
+        file->seek(from);
+        status_code = HTTP_STATUS_PARTIAL_CONTENT;
         resp->content_length = to - from + 1;
-        resp->File(filepath.c_str(), false);
+        resp->SetContentTypeByFilename(filepath.c_str());
         resp->SetRange(from, to, total);
-        if(resp->content_length < param.max_read) {
-            // range with memory
-            int nread = file.readrange(resp->body, from, to);
-            file.close();
+        if(resp->content_length < service->max_file_cache_size) {
+            // read into body directly
+            int nread = file->readrange(resp->body, from, to);
+            closeFile();
             if (nread != resp->content_length) {
                 resp->content_length = 0;
-                resp->Reset();
+                resp->body.clear();
                 return HTTP_STATUS_INTERNAL_SERVER_ERROR;
             }
-            return HTTP_STATUS_PARTIAL_CONTENT;
         }
-        else { // range with file cache
-            writer->WriteStatus(HTTP_STATUS_PARTIAL_CONTENT);
-            writer->EndHeaders();
-            return HTTP_STATUS_UNFINISHED;
+        else {
+            if (service->largeFileHandler) {
+                status_code = customHttpHandler(service->largeFileHandler);
+            } else {
+                status_code = defaultLargeFileHandler();
+            }
         }
+        return status_code;
     }
+
+    // FileCache
+    FileCache::OpenParam param;
+    param.max_read = service->max_file_cache_size;
     param.need_read = !(req->method == HTTP_HEAD || has_range);
     param.path = req_path;
     fc = files->Open(filepath.c_str(), &param);
     if (fc == NULL) {
-        // status_code = HTTP_STATUS_NOT_FOUND;
         if (param.error == ERR_OVER_LIMIT) {
-            /*
             if (service->largeFileHandler) {
                 status_code = customHttpHandler(service->largeFileHandler);
+            } else {
+                status_code = defaultLargeFileHandler();
             }
-            */
-
-            if (file.open(filepath.c_str(), "rb") != 0) {
-                return HTTP_STATUS_NOT_FOUND;
-            }
-            
-            // use file cache for large file
-            resp->content_length = file.size();
-            resp->File(filepath.c_str(), false);
-            writer->WriteStatus(HTTP_STATUS_OK);
-			writer->EndHeaders();
-            return HTTP_STATUS_UNFINISHED;
+        } else {
+            status_code = HTTP_STATUS_NOT_FOUND;
         }
     }
     else {
@@ -307,12 +337,50 @@ int HttpHandler::defaultStaticHandler() {
     return status_code;
 }
 
+int HttpHandler::defaultLargeFileHandler() {
+    if (!writer) return HTTP_STATUS_NOT_IMPLEMENTED;
+    if (!isFileOpened()) {
+        std::string filepath = service->document_root + req->Path();
+        if (openFile(filepath.c_str()) != 0) {
+            return HTTP_STATUS_NOT_FOUND;
+        }
+        resp->content_length = file->size();
+        resp->SetContentTypeByFilename(filepath.c_str());
+    }
+    if (service->limit_rate == 0) {
+        // forbidden to send large file
+        resp->content_length = 0;
+        resp->status_code = HTTP_STATUS_NOT_ACCEPTABLE;
+    } else {
+        size_t bufsize = 40960; // 40K
+        file->buf.resize(bufsize);
+        if (service->limit_rate < 0) {
+            // unlimited: sendFile when writable
+            writer->onwrite = [this](HBuf* buf) {
+                if (writer->isWriteComplete()) {
+                    sendFile();
+                }
+            };
+        } else {
+            // limit_rate=40KB/s  interval_ms=1000
+            // limit_rate=500KB/s interval_ms=80
+            int interval_ms = file->buf.len * 1000 / 1024 / service->limit_rate;
+            // limit_rate=40MB/s interval_m=1: 40KB/ms = 40MB/s = 320Mbps
+            if (interval_ms == 0) interval_ms = 1;
+            // printf("limit_rate=%dKB/s interval_ms=%d\n", service->limit_rate, interval_ms);
+            hv::setInterval(interval_ms, std::bind(&HttpHandler::sendFile, this));
+        }
+    }
+    writer->EndHeaders();
+    return HTTP_STATUS_UNFINISHED;
+}
+
 int HttpHandler::defaultErrorHandler() {
     // error page
     if (service->error_page.size() != 0) {
         std::string filepath = service->document_root + '/' + service->error_page;
+        // cache and load error page
         FileCache::OpenParam param;
-        // load error page from file cache..
         fc = files->Open(filepath.c_str(), &param);
     }
     // status page
@@ -361,6 +429,8 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
             state = SEND_HEADER;
         case SEND_HEADER:
         {
+            size_t content_length = 0;
+            const char* content = NULL;
             // HEAD
             if (pReq->method == HTTP_HEAD) {
                 if (fc) {
@@ -371,8 +441,7 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
                     pResp->headers["Content-Length"] = "0";
                 }
                 state = SEND_DONE;
-                pResp->content_length = 0;
-                goto return_header;
+                goto return_nobody;
             }
             // File service
             if (fc) {
@@ -386,19 +455,25 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
                 return *len;
             }
             // API service
-            if (const char* content = (const char*)pResp->Content()) {
-                int content_length = pResp->ContentLength();
+            content_length = pResp->ContentLength();
+            content = (const char*)pResp->Content();
+            if (content) {
                 if (content_length > (1 << 20)) {
                     state = SEND_BODY;
+                    goto return_header;
                 } else {
                     // NOTE: header+body in one package if <= 1M
                     header = pResp->Dump(true, false);
                     header.append(content, content_length);
                     state = SEND_DONE;
+                    goto return_header;
                 }
             } else {
                 state = SEND_DONE;
+                goto return_header;
             }
+return_nobody:
+            pResp->content_length = 0;
 return_header:
             if (header.empty()) header = pResp->Dump(true, false);
             *data = (char*)header.c_str();
@@ -414,13 +489,12 @@ return_header:
         }
         case SEND_DONE:
         {
-            // NOTE: remove file cache if > 16M
+            // NOTE: remove file cache if > FILE_CACHE_MAX_SIZE
             if (fc && fc->filebuf.len > FILE_CACHE_MAX_SIZE) {
                 files->Close(fc);
             }
             fc = NULL;
             header.clear();
-            file.close();
             return 0;
         }
         default:
@@ -432,62 +506,45 @@ return_header:
     return 0;
 }
 
-void HttpHandler::flushFile() {
-    if(!writer || !file.isopen())
-        return ;
-    int len = 40960; // 416K
-#if 0
-    socklen_t optlen = sizeof(len);
-    getsockopt(writer->fd(), SOL_SOCKET, SO_SNDBUF, (char*)&len, &optlen);
-    if(len < 4096) len = 4096;
-    len++;
-#endif    
-    char* buff = NULL;
-    HV_ALLOC(buff, len);
-    flushing_ = true;
-    last_flush_time = gettick_ms();
-    while (resp->content_length > 0) {
-        size_t nread = file.read(buff, len);
-        if (nread <= 0) {
-            hlogi("%p flushFile finish\n", this);
-            file.close();
-            state = SEND_DONE;
-            break;
-        }
-        int ret = writer->write(buff, nread);
-        if (ret < 0) {
-            hlogi("%p flushFile netwrite error %d\n", this, ret);
-            state = SEND_DONE;
-            file.close();
-            break;
-        } 
-        else {
-            last_flush_size += ret;
-            resp->content_length -= ret;
-            if (ret != nread) {
-                hlogd("%p flushFile %d, file cur %d, %d remain\n", this, last_flush_size, file.tell(), resp->content_length);
-                break;
-            }
-        }
+int HttpHandler::openFile(const char* filepath) {
+    closeFile();
+    file = new LargeFile;
+    return file->open(filepath, "rb");
+}
+
+bool HttpHandler::isFileOpened() {
+    return file && file->isopen();
+}
+
+int HttpHandler::sendFile() {
+    if (!writer || !writer->isWriteComplete() ||
+        !isFileOpened() ||
+        resp->content_length == 0) {
+        return -1;
+    }
+
+    int readbytes = MIN(file->buf.len, resp->content_length);
+    size_t nread = file->read(file->buf.base, readbytes);
+    if (nread <= 0) {
+        hloge("read file error!");
+        writer->close(true);
+        return 0;
+    }
+    writer->WriteBody(file->buf.base, nread);
+    resp->content_length -= nread;
+    if (resp->content_length == 0) {
+        writer->End();
+        closeFile();
     }
-    HV_FREE(buff);
-    flushing_ = false;
+    return nread;
 }
 
-void HttpHandler::onWrite(hv::Buffer* buf) {
-    //printf("%p onWrite %d\n", this, buf->len);
-    if (protocol == HTTP_V1 && file.isopen()) {
-        if (writer->isWriteComplete() && !flushing_) {
-            int tick = 1;
-            int ms_delta = gettick_ms() - last_flush_time;
-            if (service->file_speed > 0) {
-                tick = last_flush_size / service->file_speed - ms_delta;
-                // timeout_ms can't be 0
-                if(tick < 1) tick = 1;
-            }
-            hlogd("%p flushFile after %d ms, speed %d kB/s\n", this, tick, last_flush_size/(ms_delta + tick));
-            flush_timer = hv::setTimeout(tick, std::bind(&HttpHandler::flushFile, this));
-            last_flush_size = 0;
+void HttpHandler::closeFile() {
+    if (file) {
+        if (file->timer != INVALID_TIMER_ID) {
+            hv::killTimer(file->timer);
         }
+        delete file;
+        file = NULL;
     }
 }

+ 21 - 62
http/server/HttpHandler.h

@@ -35,75 +35,35 @@ public:
 
     // for http
     HttpService             *service;
-    FileCache               *files;
-
     HttpRequestPtr          req;
     HttpResponsePtr         resp;
     HttpResponseWriterPtr   writer;
     HttpParserPtr           parser;
 
+    // for sendfile
+    FileCache               *files;
+    file_cache_ptr          fc; // cache small file
+    struct LargeFile : public HFile {
+        HBuf                buf;
+        uint64_t            timer;
+    } *file; // for large file
+
     // for GetSendData
-    file_cache_ptr          fc;
     std::string             header;
-    // std::string             body;
+    // std::string          body;
 
     // for websocket
+    WebSocketService*           ws_service;
     WebSocketChannelPtr         ws_channel;
     WebSocketParserPtr          ws_parser;
     uint64_t                    last_send_ping_time;
     uint64_t                    last_recv_pong_time;
-    WebSocketService*           ws_service;
 
     HttpHandler();
     ~HttpHandler();
 
-    bool Init(int http_version = 1, hio_t* io = NULL) {
-        parser.reset(HttpParser::New(HTTP_SERVER, (enum http_version)http_version));
-        if (parser == NULL) {
-            return false;
-        }
-        req.reset(new HttpRequest);
-        resp.reset(new HttpResponse);
-        if (http_version == 2) {
-            protocol = HTTP_V2;
-            resp->http_major = req->http_major = 2;
-            resp->http_minor = req->http_minor = 0;
-        }
-        else if(http_version == 1) {
-            protocol = HTTP_V1;
-        }
-        parser->InitRequest(req.get());
-        if (io) {
-            // shared resp object with HttpResponseWriter
-            writer.reset(new hv::HttpResponseWriter(io, resp));
-            writer->status = hv::SocketChannel::CONNECTED;
-            writer->onwrite = std::bind(&HttpHandler::onWrite, this, std::placeholders::_1);
-        }
-        return true;
-    }
-
-    bool SwitchHTTP2() {
-        parser.reset(HttpParser::New(HTTP_SERVER, ::HTTP_V2));
-        if (parser == NULL) {
-            return false;
-        }
-        protocol = HTTP_V2;
-        resp->http_major = req->http_major = 2;
-        resp->http_minor = req->http_minor = 0;
-        parser->InitRequest(req.get());
-        return true;
-    }
-
-    void Reset() {
-        state = WANT_RECV;
-        req->Reset();
-        resp->Reset();
-        parser->InitRequest(req.get());
-        if (writer) {
-            writer->Begin();
-        }
-        resetFlush();
-    }
+    bool Init(int http_version = 1, hio_t* io = NULL);
+    void Reset();
 
     int FeedRecvData(const char* data, size_t len);
     // @workflow: preprocessor -> api -> web -> postprocessor
@@ -111,9 +71,11 @@ public:
     int HandleHttpRequest();
     int GetSendData(char** data, size_t* len);
 
-    // websocket
-    bool SwitchWebSocket(hio_t* io, ws_session_type type = WS_SERVER);
+    // HTTP2
+    bool SwitchHTTP2();
 
+    // websocket
+    bool SwitchWebSocket(hio_t* io = NULL);
     void WebSocketOnOpen() {
         ws_channel->status = hv::SocketChannel::CONNECTED;
         if (ws_service && ws_service->onopen) {
@@ -128,17 +90,14 @@ public:
     }
 
 private:
-    HFile file; ///< file cache body
-    uint64_t flush_timer;
-    bool flushing_;
-    int last_flush_size;
-    uint64_t last_flush_time;
-    void flushFile();
-    void resetFlush();
-    void onWrite(hv::Buffer* buf);
+    int  openFile(const char* filepath);
+    int  sendFile();
+    void closeFile();
+    bool isFileOpened();
 
     int defaultRequestHandler();
     int defaultStaticHandler();
+    int defaultLargeFileHandler();
     int defaultErrorHandler();
     int customHttpHandler(const http_handler& handler);
     int invokeHttpHandler(const http_handler* handler);

+ 41 - 32
http/server/HttpServer.cpp

@@ -20,20 +20,12 @@ static void on_accept(hio_t* io);
 static void on_recv(hio_t* io, void* _buf, int readbytes);
 static void on_close(hio_t* io);
 
-static HttpService* default_http_service() {
-    static HttpService* s_default_service = new HttpService;
-    return s_default_service;
-}
-
-static FileCache* default_filecache() {
-    static FileCache s_filecache;
-    return &s_filecache;
-}
-
 struct HttpServerPrivdata {
-    std::vector<EventLoopPtr>   loops;
-    std::vector<hthread_t>      threads;
-    std::mutex                  mutex_;
+    std::vector<EventLoopPtr>       loops;
+    std::vector<hthread_t>          threads;
+    std::mutex                      mutex_;
+    std::shared_ptr<HttpService>    service;
+    FileCache                       filecache;
 };
 
 static void on_recv(hio_t* io, void* _buf, int readbytes) {
@@ -107,6 +99,7 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
 
     // Upgrade:
     bool upgrade = false;
+    HttpHandler::ProtocolType upgrade_protocol = HttpHandler::UNKNOWN;
     auto iter_upgrade = req->headers.find("upgrade");
     if (iter_upgrade != req->headers.end()) {
         upgrade = true;
@@ -114,11 +107,6 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         hlogi("[%s:%d] Upgrade: %s", handler->ip, handler->port, upgrade_proto);
         // websocket
         if (stricmp(upgrade_proto, "websocket") == 0) {
-            if (!handler->SwitchWebSocket(io)) {
-                hloge("[%s:%d] unsupported websocket", handler->ip, handler->port);
-                hio_close(io);
-                return;
-            }
             /*
             HTTP/1.1 101 Switching Protocols
             Connection: Upgrade
@@ -134,10 +122,8 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
                 ws_encode_key(iter_key->second.c_str(), ws_accept);
                 resp->headers[SEC_WEBSOCKET_ACCEPT] = ws_accept;
             }
-            
-            // write upgrade resp
-            std::string header = resp->Dump(true, false);
-            hio_write(io, header.data(), header.length());
+            upgrade_protocol = HttpHandler::WEBSOCKET;
+            // NOTE: SwitchWebSocket after send handshake response
         }
         // h2/h2c
         else if (strnicmp(upgrade_proto, "h2", 2) == 0) {
@@ -182,6 +168,17 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         http_method_str(req->method), req->path.c_str(),
         resp->status_code, resp->status_message());
 
+    // switch protocol to websocket
+    if (upgrade && upgrade_protocol == HttpHandler::WEBSOCKET) {
+        if (!handler->SwitchWebSocket(io)) {
+            hloge("[%s:%d] unsupported websocket", handler->ip, handler->port);
+            hio_close(io);
+            return;
+        }
+        // onopen
+        handler->WebSocketOnOpen();
+    }
+
     if (status_code && !keepalive) {
         hio_close(io);
     }
@@ -229,12 +226,14 @@ static void on_accept(hio_t* io) {
     // websocket service
     handler->ws_service = server->ws;
     // FileCache
-    handler->files = default_filecache();
+    HttpServerPrivdata* privdata = (HttpServerPrivdata*)server->privdata;
+    handler->files = &privdata->filecache;
     hevent_set_userdata(io, handler);
 }
 
 static void loop_thread(void* userdata) {
     http_server_t* server = (http_server_t*)userdata;
+    HttpService* service = server->service;
 
     EventLoopPtr loop(new EventLoop);
     hloop_t* hloop = loop->loop();
@@ -258,15 +257,25 @@ static void loop_thread(void* userdata) {
         hidle_add(hloop, [](hidle_t*) {
             hlog_fsync();
         }, INFINITE);
-        // NOTE: add timer to remove expired file cache
-        htimer_add(hloop, [](htimer_t*) {
-            FileCache* filecache = default_filecache();
-            filecache->RemoveExpiredFileCache();
-        }, DEFAULT_FILE_EXPIRED_TIME * 1000);
+
         // NOTE: add timer to update s_date every 1s
         htimer_add(hloop, [](htimer_t* timer) {
             gmtime_fmt(hloop_now(hevent_loop(timer)), HttpMessage::s_date);
         }, 1000);
+
+        // FileCache
+        FileCache* filecache = &privdata->filecache;
+        filecache->stat_interval = service->file_cache_stat_interval;
+        filecache->expired_time = service->file_cache_expired_time;
+        if (filecache->expired_time > 0) {
+            filecache->expired_time = service->file_cache_expired_time;
+            // NOTE: add timer to remove expired file cache
+            htimer_t* timer = htimer_add(hloop, [](htimer_t* timer) {
+                FileCache* filecache = (FileCache*)hevent_userdata(timer);
+                filecache->RemoveExpiredFileCache();
+            },  filecache->expired_time * 1000);
+            hevent_set_userdata(timer, filecache);
+        }
     }
     privdata->loops.push_back(loop);
     privdata->mutex_.unlock();
@@ -287,13 +296,13 @@ int http_server_run(http_server_t* server, int wait) {
         if (server->listenfd[1] < 0) return server->listenfd[1];
         hlogi("https server listening on %s:%d", server->host, server->https_port);
     }
-    // service
-    if (server->service == NULL) {
-        server->service = default_http_service();
-    }
 
     HttpServerPrivdata* privdata = new HttpServerPrivdata;
     server->privdata = privdata;
+    if (server->service == NULL) {
+        privdata->service.reset(new HttpService);
+        server->service = privdata->service.get();
+    }
 
     if (server->worker_processes) {
         // multi-processes

+ 19 - 1
http/server/HttpService.h

@@ -20,6 +20,11 @@
 #define DEFAULT_INDEXOF_DIR     "/downloads/"
 #define DEFAULT_KEEPALIVE_TIMEOUT   75000   // ms
 
+// for FileCache
+#define MAX_FILE_CACHE_SIZE                 (1 << 22)   // 4M
+#define DEFAULT_FILE_CACHE_STAT_INTERVAL    10          // s
+#define DEFAULT_FILE_CACHE_EXPIRED_TIME     60          // s
+
 /*
  * @param[in]  req:  parsed structured http request
  * @param[out] resp: structured http response
@@ -107,11 +112,20 @@ struct HV_EXPORT HttpService {
     std::string     error_page;
     // indexof service (that is http.DirectoryServer)
     std::string     index_of;
-    int file_speed = 0; // file download speed limit(KB/s, <=0 no limit)
     http_handler    errorHandler;
 
     // options
     int keepalive_timeout;
+    int max_file_cache_size;        // cache small file
+    int file_cache_stat_interval;   // stat file is modified
+    int file_cache_expired_time;    // remove expired file cache
+    /*
+     * @test    limit_rate
+     * @build   make examples
+     * @server  bin/httpd -c etc/httpd.conf -s restart -d
+     * @client  bin/wget http://127.0.0.1:8080/downloads/test.zip
+     */
+    int limit_rate; // limit send rate, unit: KB/s
 
     HttpService() {
         // base_url = DEFAULT_BASE_URL;
@@ -122,6 +136,10 @@ struct HV_EXPORT HttpService {
         // index_of = DEFAULT_INDEXOF_DIR;
 
         keepalive_timeout = DEFAULT_KEEPALIVE_TIMEOUT;
+        max_file_cache_size = MAX_FILE_CACHE_SIZE;
+        file_cache_stat_interval = DEFAULT_FILE_CACHE_STAT_INTERVAL;
+        file_cache_expired_time = DEFAULT_FILE_CACHE_EXPIRED_TIME;
+        limit_rate = -1; // unlimited
     }
 
     void AddApi(const char* path, http_method method, const http_handler& handler);