소스 검색

完善HttpService对大文件的支持:
- 删除largeFileHandler, 并通过onWrite回调实现一个支持限速的文件下载缓存
- Range下载可根据请求大小, 采用以文件缓存或内存方式提供数据;
- 减少文件最大缓存大小 64M -> 4M, 减低作为文件服务器的内存使用

cqm 3 년 전
부모
커밋
789045ec6b
5개의 변경된 파일193개의 추가작업 그리고 89개의 파일을 삭제
  1. 3 2
      http/HttpMessage.h
  2. 1 1
      http/server/FileCache.h
  3. 168 69
      http/server/HttpHandler.cpp
  4. 20 16
      http/server/HttpHandler.h
  5. 1 1
      http/server/HttpService.h

+ 3 - 2
http/HttpMessage.h

@@ -354,7 +354,7 @@ public:
         return 200;
     }
 
-    int File(const char* filepath) {
+    int File(const char* filepath, bool read = true) {
         HFile file;
         if (file.open(filepath, "rb") != 0) {
             return HTTP_STATUS_NOT_FOUND;
@@ -366,7 +366,8 @@ public:
         if (content_type == CONTENT_TYPE_NONE || content_type == CONTENT_TYPE_UNDEFINED) {
             content_type = APPLICATION_OCTET_STREAM;
         }
-        file.readall(body);
+        if (read)
+            file.readall(body);
         return 200;
     }
 

+ 1 - 1
http/server/FileCache.h

@@ -10,7 +10,7 @@
 #include "hstring.h"
 
 #define HTTP_HEADER_MAX_LENGTH      1024        // 1K
-#define FILE_CACHE_MAX_SIZE         (1 << 26)   // 64M
+#define FILE_CACHE_MAX_SIZE         (1 << 22)   // 4M
 
 typedef struct file_cache_s {
     std::string filepath;

+ 168 - 69
http/server/HttpHandler.cpp

@@ -6,6 +6,7 @@
 #include "hasync.h" // import hv::async for http_async_handler
 #include "http_page.h"
 
+#include "EventLoop.h"
 #include "htime.h"
 bool HttpHandler::SwitchWebSocket(hio_t* io, ws_session_type type) {
     if(!io || !ws_service) return false;
@@ -57,6 +58,38 @@ bool HttpHandler::SwitchWebSocket(hio_t* io, ws_session_type type) {
     return true;
 }
 
+HttpHandler::HttpHandler()
+{
+    protocol = UNKNOWN;
+    state = WANT_RECV;
+    ssl = false;
+    service = NULL;
+    files = NULL;
+    ws_service = NULL;
+    last_send_ping_time = 0;
+    last_recv_pong_time = 0;
+
+    flushing_ = false;
+    last_flush_size = 0;
+    last_flush_time = 0;
+    flush_timer = 0;
+}
+
+HttpHandler::~HttpHandler() {
+    if (writer) {
+        writer->status = hv::SocketChannel::DISCONNECTED;
+    }
+    resetFlush();
+}
+
+void HttpHandler::resetFlush(){
+    file.close();
+    if(flush_timer){
+        hv::killTimer(flush_timer);
+        flush_timer = 0;
+    }
+}
+
 int HttpHandler::customHttpHandler(const http_handler& handler) {
     return invokeHttpHandler(&handler);
 }
@@ -179,7 +212,6 @@ int HttpHandler::defaultRequestHandler() {
 
 int HttpHandler::defaultStaticHandler() {
     // file service
-    int status_code = HTTP_STATUS_OK;
     std::string path = req->Path();
     const char* req_path = path.c_str();
     // path safe check
@@ -190,44 +222,86 @@ int HttpHandler::defaultStaticHandler() {
     if (req_path[1] == '\0') {
         filepath += service->home_page;
     }
-    bool is_dir = filepath.c_str()[filepath.size()-1] == '/';
+    bool is_dir = filepath[filepath.size()-1] == '/';
     bool is_index_of = false;
     if (service->index_of.size() != 0 && hv_strstartswith(req_path, service->index_of.c_str())) {
         is_index_of = true;
     }
-    if (!is_dir || is_index_of) {
-        FileCache::OpenParam param;
-        bool has_range = req->headers.find("Range") != req->headers.end();
-        param.need_read = !(req->method == HTTP_HEAD || has_range);
-        param.path = req_path;
-        fc = files->Open(filepath.c_str(), &param);
-        if (fc == NULL) {
-            status_code = HTTP_STATUS_NOT_FOUND;
-            if (param.error == ERR_OVER_LIMIT) {
-                if (service->largeFileHandler) {
-                    status_code = customHttpHandler(service->largeFileHandler);
-                }
+    if (is_dir && !is_index_of) { // unsupport dir without index
+        return HTTP_STATUS_NOT_FOUND;
+    }
+
+    int status_code = HTTP_STATUS_OK;
+    bool has_range = false;
+    FileCache::OpenParam param;
+    long from, to = 0;
+    // Range:
+    if (req->GetRange(from, to)) {
+        has_range = true;
+        if (file.open(filepath.c_str(), "rb") != 0) {
+            return HTTP_STATUS_NOT_FOUND;
+        }
+        long total = file.size();
+        if (to == 0 || to >= total) to = total - 1;
+        file.seek(from);
+        resp->content_length = to - from + 1;
+        resp->File(filepath.c_str(), false);
+        resp->SetRange(from, to, total);
+        if(resp->content_length < param.max_read) {
+            // range with memory
+            int nread = file.readrange(resp->body, from, to);
+            file.close();
+            if (nread != resp->content_length) {
+                resp->content_length = 0;
+                resp->Reset();
+                return HTTP_STATUS_INTERNAL_SERVER_ERROR;
             }
+            return HTTP_STATUS_PARTIAL_CONTENT;
+        }
+        else { // range with file cache
+            writer->WriteStatus(HTTP_STATUS_PARTIAL_CONTENT);
+            writer->EndHeaders();
+            return HTTP_STATUS_UNFINISHED;
         }
-    } else {
-        status_code = HTTP_STATUS_NOT_FOUND;
     }
+    param.need_read = !(req->method == HTTP_HEAD || has_range);
+    param.path = req_path;
+    fc = files->Open(filepath.c_str(), &param);
+    if (fc == NULL) {
+        // status_code = HTTP_STATUS_NOT_FOUND;
+        if (param.error == ERR_OVER_LIMIT) {
+            /*
+            if (service->largeFileHandler) {
+                status_code = customHttpHandler(service->largeFileHandler);
+            }
+            */
 
-    if (fc) {
+            if (file.open(filepath.c_str(), "rb") != 0) {
+                return HTTP_STATUS_NOT_FOUND;
+            }
+            
+            // use file cache for large file
+            resp->content_length = file.size();
+            resp->File(filepath.c_str(), false);
+            writer->WriteStatus(HTTP_STATUS_OK);
+			writer->EndHeaders();
+            return HTTP_STATUS_UNFINISHED;
+        }
+    }
+    else {
         // Not Modified
         auto iter = req->headers.find("if-not-match");
         if (iter != req->headers.end() &&
             strcmp(iter->second.c_str(), fc->etag) == 0) {
-            status_code = HTTP_STATUS_NOT_MODIFIED;
             fc = NULL;
+            return HTTP_STATUS_NOT_MODIFIED;
         }
-        else {
-            iter = req->headers.find("if-modified-since");
-            if (iter != req->headers.end() &&
-                strcmp(iter->second.c_str(), fc->last_modified) == 0) {
-                status_code = HTTP_STATUS_NOT_MODIFIED;
-                fc = NULL;
-            }
+
+        iter = req->headers.find("if-modified-since");
+        if (iter != req->headers.end() &&
+            strcmp(iter->second.c_str(), fc->last_modified) == 0) {
+            fc = NULL;
+            return HTTP_STATUS_NOT_MODIFIED;
         }
     }
     return status_code;
@@ -238,6 +312,7 @@ int HttpHandler::defaultErrorHandler() {
     if (service->error_page.size() != 0) {
         std::string filepath = service->document_root + '/' + service->error_page;
         FileCache::OpenParam param;
+        // load error page from file cache..
         fc = files->Open(filepath.c_str(), &param);
     }
     // status page
@@ -286,8 +361,6 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
             state = SEND_HEADER;
         case SEND_HEADER:
         {
-            int content_length = 0;
-            const char* content = NULL;
             // HEAD
             if (pReq->method == HTTP_HEAD) {
                 if (fc) {
@@ -298,34 +371,11 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
                     pResp->headers["Content-Length"] = "0";
                 }
                 state = SEND_DONE;
-                goto return_nobody;
+                pResp->content_length = 0;
+                goto return_header;
             }
             // File service
             if (fc) {
-                long from, to, total;
-                int nread;
-                // Range:
-                if (pReq->GetRange(from, to)) {
-                    HFile file;
-                    if (file.open(fc->filepath.c_str(), "rb") != 0) {
-                        pResp->status_code = HTTP_STATUS_NOT_FOUND;
-                        state = SEND_DONE;
-                        goto return_nobody;
-                    }
-                    total = file.size();
-                    if (to == 0 || to >= total) to = total - 1;
-                    pResp->content_length = to - from + 1;
-                    nread = file.readrange(body, from, to);
-                    if (nread != pResp->content_length) {
-                        pResp->status_code = HTTP_STATUS_INTERNAL_SERVER_ERROR;
-                        state = SEND_DONE;
-                        goto return_nobody;
-                    }
-                    pResp->status_code = HTTP_STATUS_PARTIAL_CONTENT;
-                    pResp->SetRange(from, to, total);
-                    state = SEND_BODY;
-                    goto return_header;
-                }
                 // FileCache
                 // NOTE: no copy filebuf, more efficient
                 header = pResp->Dump(true, false);
@@ -336,25 +386,19 @@ int HttpHandler::GetSendData(char** data, size_t* len) {
                 return *len;
             }
             // API service
-            content_length = pResp->ContentLength();
-            content = (const char*)pResp->Content();
-            if (content) {
+            if (const char* content = (const char*)pResp->Content()) {
+                int content_length = pResp->ContentLength();
                 if (content_length > (1 << 20)) {
                     state = SEND_BODY;
-                    goto return_header;
                 } else {
                     // NOTE: header+body in one package if <= 1M
                     header = pResp->Dump(true, false);
                     header.append(content, content_length);
                     state = SEND_DONE;
-                    goto return_header;
                 }
             } else {
                 state = SEND_DONE;
-                goto return_header;
             }
-return_nobody:
-            pResp->content_length = 0;
 return_header:
             if (header.empty()) header = pResp->Dump(true, false);
             *data = (char*)header.c_str();
@@ -363,25 +407,20 @@ return_header:
         }
         case SEND_BODY:
         {
-            if (body.empty()) {
-                *data = (char*)pResp->Content();
-                *len = pResp->ContentLength();
-            } else {
-                *data = (char*)body.c_str();
-                *len = body.size();
-            }
+            *data = (char*)pResp->Content();
+            *len = pResp->ContentLength();
             state = SEND_DONE;
             return *len;
         }
         case SEND_DONE:
         {
             // NOTE: remove file cache if > 16M
-            if (fc && fc->filebuf.len > (1 << 24)) {
+            if (fc && fc->filebuf.len > FILE_CACHE_MAX_SIZE) {
                 files->Close(fc);
             }
             fc = NULL;
             header.clear();
-            body.clear();
+            file.close();
             return 0;
         }
         default:
@@ -392,3 +431,63 @@ return_header:
     }
     return 0;
 }
+
+void HttpHandler::flushFile() {
+    if(!writer || !file.isopen())
+        return ;
+    int len = 40960; // 416K
+#if 0
+    socklen_t optlen = sizeof(len);
+    getsockopt(writer->fd(), SOL_SOCKET, SO_SNDBUF, (char*)&len, &optlen);
+    if(len < 4096) len = 4096;
+    len++;
+#endif    
+    char* buff = NULL;
+    HV_ALLOC(buff, len);
+    flushing_ = true;
+    last_flush_time = gettick_ms();
+    while (resp->content_length > 0) {
+        size_t nread = file.read(buff, len);
+        if (nread <= 0) {
+            hlogi("%p flushFile finish\n", this);
+            file.close();
+            state = SEND_DONE;
+            break;
+        }
+        int ret = writer->write(buff, nread);
+        if (ret < 0) {
+            hlogi("%p flushFile netwrite error %d\n", this, ret);
+            state = SEND_DONE;
+            file.close();
+            break;
+        } 
+        else {
+            last_flush_size += ret;
+            resp->content_length -= ret;
+            if (ret != nread) {
+                hlogd("%p flushFile %d, file cur %d, %d remain\n", this, last_flush_size, file.tell(), resp->content_length);
+                break;
+            }
+        }
+    }
+    HV_FREE(buff);
+    flushing_ = false;
+}
+
+void HttpHandler::onWrite(hv::Buffer* buf) {
+    //printf("%p onWrite %d\n", this, buf->len);
+    if (protocol == HTTP_V1 && file.isopen()) {
+        if (writer->isWriteComplete() && !flushing_) {
+            int tick = 1;
+            int ms_delta = gettick_ms() - last_flush_time;
+            if (service->file_speed > 0) {
+                tick = last_flush_size / service->file_speed - ms_delta;
+                // timeout_ms can't be 0
+                if(tick < 1) tick = 1;
+            }
+            hlogd("%p flushFile after %d ms, speed %d kB/s\n", this, tick, last_flush_size/(ms_delta + tick));
+            flush_timer = hv::setTimeout(tick, std::bind(&HttpHandler::flushFile, this));
+            last_flush_size = 0;
+        }
+    }
+}

+ 20 - 16
http/server/HttpHandler.h

@@ -16,6 +16,7 @@ public:
         HTTP_V2,
         WEBSOCKET,
     } protocol;
+
     enum State {
         WANT_RECV,
         HANDLE_BEGIN,
@@ -44,7 +45,7 @@ public:
     // for GetSendData
     file_cache_ptr          fc;
     std::string             header;
-    std::string             body;
+    // std::string             body;
 
     // for websocket
     WebSocketChannelPtr         ws_channel;
@@ -53,37 +54,30 @@ public:
     uint64_t                    last_recv_pong_time;
     WebSocketService*           ws_service;
 
-    HttpHandler() {
-        protocol = UNKNOWN;
-        state = WANT_RECV;
-        ssl = false;
-        service = NULL;
-        files = NULL;
-        ws_service = NULL;
-    }
-
-    ~HttpHandler() {
-        if (writer) {
-            writer->status = hv::SocketChannel::DISCONNECTED;
-        }
-    }
+    HttpHandler();
+    ~HttpHandler();
 
     bool Init(int http_version = 1, hio_t* io = NULL) {
         parser.reset(HttpParser::New(HTTP_SERVER, (enum http_version)http_version));
         if (parser == NULL) {
             return false;
         }
-        protocol = http_version == 1 ? HTTP_V1 : HTTP_V2;
         req.reset(new HttpRequest);
         resp.reset(new HttpResponse);
         if (http_version == 2) {
+            protocol = HTTP_V2;
             resp->http_major = req->http_major = 2;
             resp->http_minor = req->http_minor = 0;
         }
+        else if(http_version == 1) {
+            protocol = HTTP_V1;
+        }
         parser->InitRequest(req.get());
         if (io) {
+            // shared resp object with HttpResponseWriter
             writer.reset(new hv::HttpResponseWriter(io, resp));
             writer->status = hv::SocketChannel::CONNECTED;
+            writer->onwrite = std::bind(&HttpHandler::onWrite, this, std::placeholders::_1);
         }
         return true;
     }
@@ -108,6 +102,7 @@ public:
         if (writer) {
             writer->Begin();
         }
+        resetFlush();
     }
 
     int FeedRecvData(const char* data, size_t len);
@@ -133,6 +128,15 @@ public:
     }
 
 private:
+    HFile file; ///< file cache body
+    uint64_t flush_timer;
+    bool flushing_;
+    int last_flush_size;
+    uint64_t last_flush_time;
+    void flushFile();
+    void resetFlush();
+    void onWrite(hv::Buffer* buf);
+
     int defaultRequestHandler();
     int defaultStaticHandler();
     int defaultErrorHandler();

+ 1 - 1
http/server/HttpService.h

@@ -107,7 +107,7 @@ struct HV_EXPORT HttpService {
     std::string     error_page;
     // indexof service (that is http.DirectoryServer)
     std::string     index_of;
-
+    int file_speed = 0; // file download speed limit(KB/s, <=0 no limit)
     http_handler    errorHandler;
 
     // options