HttpHandler.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. #include "HttpHandler.h"
  2. #include "hbase.h"
  3. #include "herr.h"
  4. #include "hlog.h"
  5. #include "hasync.h" // import hv::async for http_async_handler
  6. #include "http_page.h"
  7. #include "EventLoop.h"
  8. #include "htime.h"
  9. bool HttpHandler::SwitchWebSocket(hio_t* io, ws_session_type type) {
  10. if(!io || !ws_service) return false;
  11. protocol = WEBSOCKET;
  12. ws_parser.reset(new WebSocketParser);
  13. ws_channel.reset(new hv::WebSocketChannel(io, type));
  14. ws_parser->onMessage = [this](int opcode, const std::string& msg){
  15. switch(opcode) {
  16. case WS_OPCODE_CLOSE:
  17. ws_channel->close(true);
  18. break;
  19. case WS_OPCODE_PING:
  20. // printf("recv ping\n");
  21. // printf("send pong\n");
  22. ws_channel->sendPong();
  23. break;
  24. case WS_OPCODE_PONG:
  25. // printf("recv pong\n");
  26. this->last_recv_pong_time = gethrtime_us();
  27. break;
  28. case WS_OPCODE_TEXT:
  29. case WS_OPCODE_BINARY:
  30. // onmessage
  31. if (ws_service && ws_service->onmessage) {
  32. ws_service->onmessage(ws_channel, msg);
  33. }
  34. break;
  35. default:
  36. break;
  37. }
  38. };
  39. // NOTE: cancel keepalive timer, judge alive by heartbeat.
  40. hio_set_keepalive_timeout(io, 0);
  41. if (ws_service && ws_service->ping_interval > 0) {
  42. int ping_interval = MAX(ws_service->ping_interval, 1000);
  43. ws_channel->setHeartbeat(ping_interval, [this](){
  44. if (last_recv_pong_time < last_send_ping_time) {
  45. hlogw("[%s:%d] websocket no pong!", ip, port);
  46. ws_channel->close(true);
  47. } else {
  48. // printf("send ping\n");
  49. ws_channel->sendPing();
  50. last_send_ping_time = gethrtime_us();
  51. }
  52. });
  53. }
  54. // onopen
  55. WebSocketOnOpen();
  56. return true;
  57. }
  58. HttpHandler::HttpHandler()
  59. {
  60. protocol = UNKNOWN;
  61. state = WANT_RECV;
  62. ssl = false;
  63. service = NULL;
  64. files = NULL;
  65. ws_service = NULL;
  66. last_send_ping_time = 0;
  67. last_recv_pong_time = 0;
  68. flushing_ = false;
  69. last_flush_size = 0;
  70. last_flush_time = 0;
  71. flush_timer = 0;
  72. }
  73. HttpHandler::~HttpHandler() {
  74. if (writer) {
  75. writer->status = hv::SocketChannel::DISCONNECTED;
  76. }
  77. resetFlush();
  78. }
  79. void HttpHandler::resetFlush(){
  80. file.close();
  81. if(flush_timer){
  82. hv::killTimer(flush_timer);
  83. flush_timer = 0;
  84. }
  85. }
  86. int HttpHandler::customHttpHandler(const http_handler& handler) {
  87. return invokeHttpHandler(&handler);
  88. }
  89. int HttpHandler::invokeHttpHandler(const http_handler* handler) {
  90. int status_code = HTTP_STATUS_NOT_IMPLEMENTED;
  91. if (handler->sync_handler) {
  92. // NOTE: sync_handler run on IO thread
  93. status_code = handler->sync_handler(req.get(), resp.get());
  94. } else if (handler->async_handler) {
  95. // NOTE: async_handler run on hv::async threadpool
  96. hv::async(std::bind(handler->async_handler, req, writer));
  97. status_code = HTTP_STATUS_UNFINISHED;
  98. } else if (handler->ctx_handler) {
  99. HttpContextPtr ctx(new hv::HttpContext);
  100. ctx->service = service;
  101. ctx->request = req;
  102. ctx->response = resp;
  103. ctx->writer = writer;
  104. // NOTE: ctx_handler run on IO thread, you can easily post HttpContextPtr to your consumer thread for processing.
  105. status_code = handler->ctx_handler(ctx);
  106. if (writer && writer->state != hv::HttpResponseWriter::SEND_BEGIN) {
  107. status_code = HTTP_STATUS_UNFINISHED;
  108. }
  109. }
  110. return status_code;
  111. }
  112. int HttpHandler::HandleHttpRequest() {
  113. // preprocessor -> processor -> postprocessor
  114. int status_code = HTTP_STATUS_OK;
  115. HttpRequest* pReq = req.get();
  116. HttpResponse* pResp = resp.get();
  117. pReq->scheme = ssl ? "https" : "http";
  118. pReq->client_addr.ip = ip;
  119. pReq->client_addr.port = port;
  120. pReq->Host();
  121. pReq->ParseUrl();
  122. // NOTE: Not all users want to parse body, we comment it out.
  123. // pReq->ParseBody();
  124. preprocessor:
  125. state = HANDLE_BEGIN;
  126. if (service->preprocessor) {
  127. status_code = customHttpHandler(service->preprocessor);
  128. if (status_code != 0) {
  129. goto postprocessor;
  130. }
  131. }
  132. processor:
  133. if (service->processor) {
  134. status_code = customHttpHandler(service->processor);
  135. } else {
  136. status_code = defaultRequestHandler();
  137. }
  138. postprocessor:
  139. if (status_code >= 100 && status_code < 600) {
  140. pResp->status_code = (http_status)status_code;
  141. }
  142. if (pResp->status_code >= 400 && pResp->body.size() == 0 && pReq->method != HTTP_HEAD) {
  143. if (service->errorHandler) {
  144. customHttpHandler(service->errorHandler);
  145. } else {
  146. defaultErrorHandler();
  147. }
  148. }
  149. if (fc) {
  150. pResp->content = fc->filebuf.base;
  151. pResp->content_length = fc->filebuf.len;
  152. pResp->headers["Content-Type"] = fc->content_type;
  153. pResp->headers["Last-Modified"] = fc->last_modified;
  154. pResp->headers["Etag"] = fc->etag;
  155. }
  156. if (service->postprocessor) {
  157. customHttpHandler(service->postprocessor);
  158. }
  159. if (status_code == 0) {
  160. state = HANDLE_CONTINUE;
  161. } else {
  162. state = HANDLE_END;
  163. parser->SubmitResponse(resp.get());
  164. }
  165. return status_code;
  166. }
  167. int HttpHandler::defaultRequestHandler() {
  168. int status_code = HTTP_STATUS_OK;
  169. http_handler* handler = NULL;
  170. if (service->api_handlers.size() != 0) {
  171. service->GetApi(req.get(), &handler);
  172. }
  173. if (handler) {
  174. status_code = invokeHttpHandler(handler);
  175. }
  176. else if (req->method == HTTP_GET || req->method == HTTP_HEAD) {
  177. // static handler
  178. if (service->staticHandler) {
  179. status_code = customHttpHandler(service->staticHandler);
  180. }
  181. else if (service->document_root.size() != 0) {
  182. status_code = defaultStaticHandler();
  183. }
  184. else {
  185. status_code = HTTP_STATUS_NOT_FOUND;
  186. }
  187. }
  188. else {
  189. // Not Implemented
  190. status_code = HTTP_STATUS_NOT_IMPLEMENTED;
  191. }
  192. return status_code;
  193. }
  194. int HttpHandler::defaultStaticHandler() {
  195. // file service
  196. std::string path = req->Path();
  197. const char* req_path = path.c_str();
  198. // path safe check
  199. if (req_path[0] != '/' || strstr(req_path, "/../")) {
  200. return HTTP_STATUS_BAD_REQUEST;
  201. }
  202. std::string filepath = service->document_root + path;
  203. if (req_path[1] == '\0') {
  204. filepath += service->home_page;
  205. }
  206. bool is_dir = filepath[filepath.size()-1] == '/';
  207. bool is_index_of = false;
  208. if (service->index_of.size() != 0 && hv_strstartswith(req_path, service->index_of.c_str())) {
  209. is_index_of = true;
  210. }
  211. if (is_dir && !is_index_of) { // unsupport dir without index
  212. return HTTP_STATUS_NOT_FOUND;
  213. }
  214. int status_code = HTTP_STATUS_OK;
  215. bool has_range = false;
  216. FileCache::OpenParam param;
  217. long from, to = 0;
  218. // Range:
  219. if (req->GetRange(from, to)) {
  220. has_range = true;
  221. if (file.open(filepath.c_str(), "rb") != 0) {
  222. return HTTP_STATUS_NOT_FOUND;
  223. }
  224. long total = file.size();
  225. if (to == 0 || to >= total) to = total - 1;
  226. file.seek(from);
  227. resp->content_length = to - from + 1;
  228. resp->File(filepath.c_str(), false);
  229. resp->SetRange(from, to, total);
  230. if(resp->content_length < param.max_read) {
  231. // range with memory
  232. int nread = file.readrange(resp->body, from, to);
  233. file.close();
  234. if (nread != resp->content_length) {
  235. resp->content_length = 0;
  236. resp->Reset();
  237. return HTTP_STATUS_INTERNAL_SERVER_ERROR;
  238. }
  239. return HTTP_STATUS_PARTIAL_CONTENT;
  240. }
  241. else { // range with file cache
  242. writer->WriteStatus(HTTP_STATUS_PARTIAL_CONTENT);
  243. writer->EndHeaders();
  244. return HTTP_STATUS_UNFINISHED;
  245. }
  246. }
  247. param.need_read = !(req->method == HTTP_HEAD || has_range);
  248. param.path = req_path;
  249. fc = files->Open(filepath.c_str(), &param);
  250. if (fc == NULL) {
  251. // status_code = HTTP_STATUS_NOT_FOUND;
  252. if (param.error == ERR_OVER_LIMIT) {
  253. /*
  254. if (service->largeFileHandler) {
  255. status_code = customHttpHandler(service->largeFileHandler);
  256. }
  257. */
  258. if (file.open(filepath.c_str(), "rb") != 0) {
  259. return HTTP_STATUS_NOT_FOUND;
  260. }
  261. // use file cache for large file
  262. resp->content_length = file.size();
  263. resp->File(filepath.c_str(), false);
  264. writer->WriteStatus(HTTP_STATUS_OK);
  265. writer->EndHeaders();
  266. return HTTP_STATUS_UNFINISHED;
  267. }
  268. }
  269. else {
  270. // Not Modified
  271. auto iter = req->headers.find("if-not-match");
  272. if (iter != req->headers.end() &&
  273. strcmp(iter->second.c_str(), fc->etag) == 0) {
  274. fc = NULL;
  275. return HTTP_STATUS_NOT_MODIFIED;
  276. }
  277. iter = req->headers.find("if-modified-since");
  278. if (iter != req->headers.end() &&
  279. strcmp(iter->second.c_str(), fc->last_modified) == 0) {
  280. fc = NULL;
  281. return HTTP_STATUS_NOT_MODIFIED;
  282. }
  283. }
  284. return status_code;
  285. }
  286. int HttpHandler::defaultErrorHandler() {
  287. // error page
  288. if (service->error_page.size() != 0) {
  289. std::string filepath = service->document_root + '/' + service->error_page;
  290. FileCache::OpenParam param;
  291. // load error page from file cache..
  292. fc = files->Open(filepath.c_str(), &param);
  293. }
  294. // status page
  295. if (fc == NULL && resp->body.size() == 0) {
  296. resp->content_type = TEXT_HTML;
  297. make_http_status_page(resp->status_code, resp->body);
  298. }
  299. return 0;
  300. }
  301. int HttpHandler::FeedRecvData(const char* data, size_t len) {
  302. int nfeed = 0;
  303. if (protocol == HttpHandler::WEBSOCKET) {
  304. nfeed = ws_parser->FeedRecvData(data, len);
  305. if (nfeed != len) {
  306. hloge("[%s:%d] websocket parse error!", ip, port);
  307. }
  308. } else {
  309. if (state != WANT_RECV) {
  310. Reset();
  311. }
  312. nfeed = parser->FeedRecvData(data, len);
  313. if (nfeed != len) {
  314. hloge("[%s:%d] http parse error: %s", ip, port, parser->StrError(parser->GetError()));
  315. }
  316. }
  317. return nfeed;
  318. }
  319. int HttpHandler::GetSendData(char** data, size_t* len) {
  320. if (state == HANDLE_CONTINUE) {
  321. return 0;
  322. }
  323. HttpRequest* pReq = req.get();
  324. HttpResponse* pResp = resp.get();
  325. if (protocol == HTTP_V1) {
  326. switch(state) {
  327. case WANT_RECV:
  328. if (parser->IsComplete()) state = WANT_SEND;
  329. else return 0;
  330. case HANDLE_END:
  331. state = WANT_SEND;
  332. case WANT_SEND:
  333. state = SEND_HEADER;
  334. case SEND_HEADER:
  335. {
  336. // HEAD
  337. if (pReq->method == HTTP_HEAD) {
  338. if (fc) {
  339. pResp->headers["Accept-Ranges"] = "bytes";
  340. pResp->headers["Content-Length"] = hv::to_string(fc->st.st_size);
  341. } else {
  342. pResp->headers["Content-Type"] = "text/html";
  343. pResp->headers["Content-Length"] = "0";
  344. }
  345. state = SEND_DONE;
  346. pResp->content_length = 0;
  347. goto return_header;
  348. }
  349. // File service
  350. if (fc) {
  351. // FileCache
  352. // NOTE: no copy filebuf, more efficient
  353. header = pResp->Dump(true, false);
  354. fc->prepend_header(header.c_str(), header.size());
  355. *data = fc->httpbuf.base;
  356. *len = fc->httpbuf.len;
  357. state = SEND_DONE;
  358. return *len;
  359. }
  360. // API service
  361. if (const char* content = (const char*)pResp->Content()) {
  362. int content_length = pResp->ContentLength();
  363. if (content_length > (1 << 20)) {
  364. state = SEND_BODY;
  365. } else {
  366. // NOTE: header+body in one package if <= 1M
  367. header = pResp->Dump(true, false);
  368. header.append(content, content_length);
  369. state = SEND_DONE;
  370. }
  371. } else {
  372. state = SEND_DONE;
  373. }
  374. return_header:
  375. if (header.empty()) header = pResp->Dump(true, false);
  376. *data = (char*)header.c_str();
  377. *len = header.size();
  378. return *len;
  379. }
  380. case SEND_BODY:
  381. {
  382. *data = (char*)pResp->Content();
  383. *len = pResp->ContentLength();
  384. state = SEND_DONE;
  385. return *len;
  386. }
  387. case SEND_DONE:
  388. {
  389. // NOTE: remove file cache if > 16M
  390. if (fc && fc->filebuf.len > FILE_CACHE_MAX_SIZE) {
  391. files->Close(fc);
  392. }
  393. fc = NULL;
  394. header.clear();
  395. file.close();
  396. return 0;
  397. }
  398. default:
  399. return 0;
  400. }
  401. } else if (protocol == HTTP_V2) {
  402. return parser->GetSendData(data, len);
  403. }
  404. return 0;
  405. }
  406. void HttpHandler::flushFile() {
  407. if(!writer || !file.isopen())
  408. return ;
  409. int len = 40960; // 416K
  410. #if 0
  411. socklen_t optlen = sizeof(len);
  412. getsockopt(writer->fd(), SOL_SOCKET, SO_SNDBUF, (char*)&len, &optlen);
  413. if(len < 4096) len = 4096;
  414. len++;
  415. #endif
  416. char* buff = NULL;
  417. HV_ALLOC(buff, len);
  418. flushing_ = true;
  419. last_flush_time = gettick_ms();
  420. while (resp->content_length > 0) {
  421. size_t nread = file.read(buff, len);
  422. if (nread <= 0) {
  423. hlogi("%p flushFile finish\n", this);
  424. file.close();
  425. state = SEND_DONE;
  426. break;
  427. }
  428. int ret = writer->write(buff, nread);
  429. if (ret < 0) {
  430. hlogi("%p flushFile netwrite error %d\n", this, ret);
  431. state = SEND_DONE;
  432. file.close();
  433. break;
  434. }
  435. else {
  436. last_flush_size += ret;
  437. resp->content_length -= ret;
  438. if (ret != nread) {
  439. hlogd("%p flushFile %d, file cur %d, %d remain\n", this, last_flush_size, file.tell(), resp->content_length);
  440. break;
  441. }
  442. }
  443. }
  444. HV_FREE(buff);
  445. flushing_ = false;
  446. }
  447. void HttpHandler::onWrite(hv::Buffer* buf) {
  448. //printf("%p onWrite %d\n", this, buf->len);
  449. if (protocol == HTTP_V1 && file.isopen()) {
  450. if (writer->isWriteComplete() && !flushing_) {
  451. int tick = 1;
  452. int ms_delta = gettick_ms() - last_flush_time;
  453. if (service->file_speed > 0) {
  454. tick = last_flush_size / service->file_speed - ms_delta;
  455. // timeout_ms can't be 0
  456. if(tick < 1) tick = 1;
  457. }
  458. hlogd("%p flushFile after %d ms, speed %d kB/s\n", this, tick, last_flush_size/(ms_delta + tick));
  459. flush_timer = hv::setTimeout(tick, std::bind(&HttpHandler::flushFile, this));
  460. last_flush_size = 0;
  461. }
  462. }
  463. }