| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- #include "HttpHandler.h"
- #include "hbase.h"
- #include "herr.h"
- #include "hlog.h"
- #include "hasync.h" // import hv::async for http_async_handler
- #include "http_page.h"
- #include "EventLoop.h"
- #include "htime.h"
- bool HttpHandler::SwitchWebSocket(hio_t* io, ws_session_type type) {
- if(!io || !ws_service) return false;
- protocol = WEBSOCKET;
- ws_parser.reset(new WebSocketParser);
- ws_channel.reset(new hv::WebSocketChannel(io, type));
- ws_parser->onMessage = [this](int opcode, const std::string& msg){
- switch(opcode) {
- case WS_OPCODE_CLOSE:
- ws_channel->close(true);
- break;
- case WS_OPCODE_PING:
- // printf("recv ping\n");
- // printf("send pong\n");
- ws_channel->sendPong();
- break;
- case WS_OPCODE_PONG:
- // printf("recv pong\n");
- this->last_recv_pong_time = gethrtime_us();
- break;
- case WS_OPCODE_TEXT:
- case WS_OPCODE_BINARY:
- // onmessage
- if (ws_service && ws_service->onmessage) {
- ws_service->onmessage(ws_channel, msg);
- }
- break;
- default:
- break;
- }
- };
- // NOTE: cancel keepalive timer, judge alive by heartbeat.
- hio_set_keepalive_timeout(io, 0);
- if (ws_service && ws_service->ping_interval > 0) {
- int ping_interval = MAX(ws_service->ping_interval, 1000);
- ws_channel->setHeartbeat(ping_interval, [this](){
- if (last_recv_pong_time < last_send_ping_time) {
- hlogw("[%s:%d] websocket no pong!", ip, port);
- ws_channel->close(true);
- } else {
- // printf("send ping\n");
- ws_channel->sendPing();
- last_send_ping_time = gethrtime_us();
- }
- });
- }
- // onopen
- WebSocketOnOpen();
- return true;
- }
- HttpHandler::HttpHandler()
- {
- protocol = UNKNOWN;
- state = WANT_RECV;
- ssl = false;
- service = NULL;
- files = NULL;
- ws_service = NULL;
- last_send_ping_time = 0;
- last_recv_pong_time = 0;
- flushing_ = false;
- last_flush_size = 0;
- last_flush_time = 0;
- flush_timer = 0;
- }
- HttpHandler::~HttpHandler() {
- if (writer) {
- writer->status = hv::SocketChannel::DISCONNECTED;
- }
- resetFlush();
- }
- void HttpHandler::resetFlush(){
- file.close();
- if(flush_timer){
- hv::killTimer(flush_timer);
- flush_timer = 0;
- }
- }
- int HttpHandler::customHttpHandler(const http_handler& handler) {
- return invokeHttpHandler(&handler);
- }
- int HttpHandler::invokeHttpHandler(const http_handler* handler) {
- int status_code = HTTP_STATUS_NOT_IMPLEMENTED;
- if (handler->sync_handler) {
- // NOTE: sync_handler run on IO thread
- status_code = handler->sync_handler(req.get(), resp.get());
- } else if (handler->async_handler) {
- // NOTE: async_handler run on hv::async threadpool
- hv::async(std::bind(handler->async_handler, req, writer));
- status_code = HTTP_STATUS_UNFINISHED;
- } else if (handler->ctx_handler) {
- HttpContextPtr ctx(new hv::HttpContext);
- ctx->service = service;
- ctx->request = req;
- ctx->response = resp;
- ctx->writer = writer;
- // NOTE: ctx_handler run on IO thread, you can easily post HttpContextPtr to your consumer thread for processing.
- status_code = handler->ctx_handler(ctx);
- if (writer && writer->state != hv::HttpResponseWriter::SEND_BEGIN) {
- status_code = HTTP_STATUS_UNFINISHED;
- }
- }
- return status_code;
- }
- int HttpHandler::HandleHttpRequest() {
- // preprocessor -> processor -> postprocessor
- int status_code = HTTP_STATUS_OK;
- HttpRequest* pReq = req.get();
- HttpResponse* pResp = resp.get();
- pReq->scheme = ssl ? "https" : "http";
- pReq->client_addr.ip = ip;
- pReq->client_addr.port = port;
- pReq->Host();
- pReq->ParseUrl();
- // NOTE: Not all users want to parse body, we comment it out.
- // pReq->ParseBody();
- preprocessor:
- state = HANDLE_BEGIN;
- if (service->preprocessor) {
- status_code = customHttpHandler(service->preprocessor);
- if (status_code != 0) {
- goto postprocessor;
- }
- }
- processor:
- if (service->processor) {
- status_code = customHttpHandler(service->processor);
- } else {
- status_code = defaultRequestHandler();
- }
- postprocessor:
- if (status_code >= 100 && status_code < 600) {
- pResp->status_code = (http_status)status_code;
- }
- if (pResp->status_code >= 400 && pResp->body.size() == 0 && pReq->method != HTTP_HEAD) {
- if (service->errorHandler) {
- customHttpHandler(service->errorHandler);
- } else {
- defaultErrorHandler();
- }
- }
- if (fc) {
- pResp->content = fc->filebuf.base;
- pResp->content_length = fc->filebuf.len;
- pResp->headers["Content-Type"] = fc->content_type;
- pResp->headers["Last-Modified"] = fc->last_modified;
- pResp->headers["Etag"] = fc->etag;
- }
- if (service->postprocessor) {
- customHttpHandler(service->postprocessor);
- }
- if (status_code == 0) {
- state = HANDLE_CONTINUE;
- } else {
- state = HANDLE_END;
- parser->SubmitResponse(resp.get());
- }
- return status_code;
- }
- int HttpHandler::defaultRequestHandler() {
- int status_code = HTTP_STATUS_OK;
- http_handler* handler = NULL;
- if (service->api_handlers.size() != 0) {
- service->GetApi(req.get(), &handler);
- }
- if (handler) {
- status_code = invokeHttpHandler(handler);
- }
- else if (req->method == HTTP_GET || req->method == HTTP_HEAD) {
- // static handler
- if (service->staticHandler) {
- status_code = customHttpHandler(service->staticHandler);
- }
- else if (service->document_root.size() != 0) {
- status_code = defaultStaticHandler();
- }
- else {
- status_code = HTTP_STATUS_NOT_FOUND;
- }
- }
- else {
- // Not Implemented
- status_code = HTTP_STATUS_NOT_IMPLEMENTED;
- }
- return status_code;
- }
- int HttpHandler::defaultStaticHandler() {
- // file service
- std::string path = req->Path();
- const char* req_path = path.c_str();
- // path safe check
- if (req_path[0] != '/' || strstr(req_path, "/../")) {
- return HTTP_STATUS_BAD_REQUEST;
- }
- std::string filepath = service->document_root + path;
- if (req_path[1] == '\0') {
- filepath += service->home_page;
- }
- bool is_dir = filepath[filepath.size()-1] == '/';
- bool is_index_of = false;
- if (service->index_of.size() != 0 && hv_strstartswith(req_path, service->index_of.c_str())) {
- is_index_of = true;
- }
- if (is_dir && !is_index_of) { // unsupport dir without index
- return HTTP_STATUS_NOT_FOUND;
- }
- int status_code = HTTP_STATUS_OK;
- bool has_range = false;
- FileCache::OpenParam param;
- long from, to = 0;
- // Range:
- if (req->GetRange(from, to)) {
- has_range = true;
- if (file.open(filepath.c_str(), "rb") != 0) {
- return HTTP_STATUS_NOT_FOUND;
- }
- long total = file.size();
- if (to == 0 || to >= total) to = total - 1;
- file.seek(from);
- resp->content_length = to - from + 1;
- resp->File(filepath.c_str(), false);
- resp->SetRange(from, to, total);
- if(resp->content_length < param.max_read) {
- // range with memory
- int nread = file.readrange(resp->body, from, to);
- file.close();
- if (nread != resp->content_length) {
- resp->content_length = 0;
- resp->Reset();
- return HTTP_STATUS_INTERNAL_SERVER_ERROR;
- }
- return HTTP_STATUS_PARTIAL_CONTENT;
- }
- else { // range with file cache
- writer->WriteStatus(HTTP_STATUS_PARTIAL_CONTENT);
- writer->EndHeaders();
- return HTTP_STATUS_UNFINISHED;
- }
- }
- param.need_read = !(req->method == HTTP_HEAD || has_range);
- param.path = req_path;
- fc = files->Open(filepath.c_str(), ¶m);
- if (fc == NULL) {
- // status_code = HTTP_STATUS_NOT_FOUND;
- if (param.error == ERR_OVER_LIMIT) {
- /*
- if (service->largeFileHandler) {
- status_code = customHttpHandler(service->largeFileHandler);
- }
- */
- if (file.open(filepath.c_str(), "rb") != 0) {
- return HTTP_STATUS_NOT_FOUND;
- }
-
- // use file cache for large file
- resp->content_length = file.size();
- resp->File(filepath.c_str(), false);
- writer->WriteStatus(HTTP_STATUS_OK);
- writer->EndHeaders();
- return HTTP_STATUS_UNFINISHED;
- }
- }
- else {
- // Not Modified
- auto iter = req->headers.find("if-not-match");
- if (iter != req->headers.end() &&
- strcmp(iter->second.c_str(), fc->etag) == 0) {
- fc = NULL;
- return HTTP_STATUS_NOT_MODIFIED;
- }
- iter = req->headers.find("if-modified-since");
- if (iter != req->headers.end() &&
- strcmp(iter->second.c_str(), fc->last_modified) == 0) {
- fc = NULL;
- return HTTP_STATUS_NOT_MODIFIED;
- }
- }
- return status_code;
- }
- int HttpHandler::defaultErrorHandler() {
- // error page
- if (service->error_page.size() != 0) {
- std::string filepath = service->document_root + '/' + service->error_page;
- FileCache::OpenParam param;
- // load error page from file cache..
- fc = files->Open(filepath.c_str(), ¶m);
- }
- // status page
- if (fc == NULL && resp->body.size() == 0) {
- resp->content_type = TEXT_HTML;
- make_http_status_page(resp->status_code, resp->body);
- }
- return 0;
- }
- int HttpHandler::FeedRecvData(const char* data, size_t len) {
- int nfeed = 0;
- if (protocol == HttpHandler::WEBSOCKET) {
- nfeed = ws_parser->FeedRecvData(data, len);
- if (nfeed != len) {
- hloge("[%s:%d] websocket parse error!", ip, port);
- }
- } else {
- if (state != WANT_RECV) {
- Reset();
- }
- nfeed = parser->FeedRecvData(data, len);
- if (nfeed != len) {
- hloge("[%s:%d] http parse error: %s", ip, port, parser->StrError(parser->GetError()));
- }
- }
- return nfeed;
- }
- int HttpHandler::GetSendData(char** data, size_t* len) {
- if (state == HANDLE_CONTINUE) {
- return 0;
- }
- HttpRequest* pReq = req.get();
- HttpResponse* pResp = resp.get();
- if (protocol == HTTP_V1) {
- switch(state) {
- case WANT_RECV:
- if (parser->IsComplete()) state = WANT_SEND;
- else return 0;
- case HANDLE_END:
- state = WANT_SEND;
- case WANT_SEND:
- state = SEND_HEADER;
- case SEND_HEADER:
- {
- // HEAD
- if (pReq->method == HTTP_HEAD) {
- if (fc) {
- pResp->headers["Accept-Ranges"] = "bytes";
- pResp->headers["Content-Length"] = hv::to_string(fc->st.st_size);
- } else {
- pResp->headers["Content-Type"] = "text/html";
- pResp->headers["Content-Length"] = "0";
- }
- state = SEND_DONE;
- pResp->content_length = 0;
- goto return_header;
- }
- // File service
- if (fc) {
- // FileCache
- // NOTE: no copy filebuf, more efficient
- header = pResp->Dump(true, false);
- fc->prepend_header(header.c_str(), header.size());
- *data = fc->httpbuf.base;
- *len = fc->httpbuf.len;
- state = SEND_DONE;
- return *len;
- }
- // API service
- if (const char* content = (const char*)pResp->Content()) {
- int content_length = pResp->ContentLength();
- if (content_length > (1 << 20)) {
- state = SEND_BODY;
- } else {
- // NOTE: header+body in one package if <= 1M
- header = pResp->Dump(true, false);
- header.append(content, content_length);
- state = SEND_DONE;
- }
- } else {
- state = SEND_DONE;
- }
- return_header:
- if (header.empty()) header = pResp->Dump(true, false);
- *data = (char*)header.c_str();
- *len = header.size();
- return *len;
- }
- case SEND_BODY:
- {
- *data = (char*)pResp->Content();
- *len = pResp->ContentLength();
- state = SEND_DONE;
- return *len;
- }
- case SEND_DONE:
- {
- // NOTE: remove file cache if > 16M
- if (fc && fc->filebuf.len > FILE_CACHE_MAX_SIZE) {
- files->Close(fc);
- }
- fc = NULL;
- header.clear();
- file.close();
- return 0;
- }
- default:
- return 0;
- }
- } else if (protocol == HTTP_V2) {
- return parser->GetSendData(data, len);
- }
- return 0;
- }
- void HttpHandler::flushFile() {
- if(!writer || !file.isopen())
- return ;
- int len = 40960; // 416K
- #if 0
- socklen_t optlen = sizeof(len);
- getsockopt(writer->fd(), SOL_SOCKET, SO_SNDBUF, (char*)&len, &optlen);
- if(len < 4096) len = 4096;
- len++;
- #endif
- char* buff = NULL;
- HV_ALLOC(buff, len);
- flushing_ = true;
- last_flush_time = gettick_ms();
- while (resp->content_length > 0) {
- size_t nread = file.read(buff, len);
- if (nread <= 0) {
- hlogi("%p flushFile finish\n", this);
- file.close();
- state = SEND_DONE;
- break;
- }
- int ret = writer->write(buff, nread);
- if (ret < 0) {
- hlogi("%p flushFile netwrite error %d\n", this, ret);
- state = SEND_DONE;
- file.close();
- break;
- }
- else {
- last_flush_size += ret;
- resp->content_length -= ret;
- if (ret != nread) {
- hlogd("%p flushFile %d, file cur %d, %d remain\n", this, last_flush_size, file.tell(), resp->content_length);
- break;
- }
- }
- }
- HV_FREE(buff);
- flushing_ = false;
- }
- void HttpHandler::onWrite(hv::Buffer* buf) {
- //printf("%p onWrite %d\n", this, buf->len);
- if (protocol == HTTP_V1 && file.isopen()) {
- if (writer->isWriteComplete() && !flushing_) {
- int tick = 1;
- int ms_delta = gettick_ms() - last_flush_time;
- if (service->file_speed > 0) {
- tick = last_flush_size / service->file_speed - ms_delta;
- // timeout_ms can't be 0
- if(tick < 1) tick = 1;
- }
- hlogd("%p flushFile after %d ms, speed %d kB/s\n", this, tick, last_flush_size/(ms_delta + tick));
- flush_timer = hv::setTimeout(tick, std::bind(&HttpHandler::flushFile, this));
- last_flush_size = 0;
- }
- }
- }
|