AsyncHttpClient.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #include "AsyncHttpClient.h"
  2. namespace hv {
  3. // createsocket => startConnect =>
  4. // onconnect => sendRequest => startRead =>
  5. // onread => HttpParser => resp_cb
  6. int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
  7. const HttpRequestPtr& req = task->req;
  8. // queueInLoop timeout?
  9. uint64_t now_hrtime = hloop_now_hrtime(loop_thread.hloop());
  10. int elapsed_ms = (now_hrtime - task->start_time) / 1000;
  11. int timeout_ms = req->timeout * 1000;
  12. if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
  13. hlogw("%s queueInLoop timeout!", req->url.c_str());
  14. return -10;
  15. }
  16. req->ParseUrl();
  17. sockaddr_u peeraddr;
  18. memset(&peeraddr, 0, sizeof(peeraddr));
  19. int ret = sockaddr_set_ipport(&peeraddr, req->host.c_str(), req->port);
  20. if (ret != 0) {
  21. hloge("unknown host %s", req->host.c_str());
  22. return -20;
  23. }
  24. int connfd = -1;
  25. // first get from conn_pools
  26. char strAddr[SOCKADDR_STRLEN] = {0};
  27. SOCKADDR_STR(&peeraddr, strAddr);
  28. auto iter = conn_pools.find(strAddr);
  29. if (iter != conn_pools.end()) {
  30. // hlogd("get from conn_pools");
  31. iter->second.get(connfd);
  32. }
  33. if (connfd < 0) {
  34. // create socket
  35. connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
  36. if (connfd < 0) {
  37. perror("socket");
  38. return -30;
  39. }
  40. hio_t* connio = hio_get(loop_thread.hloop(), connfd);
  41. assert(connio != NULL);
  42. hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
  43. addChannel(connio);
  44. // https
  45. if (req->https) {
  46. hio_enable_ssl(connio);
  47. }
  48. }
  49. const SocketChannelPtr& channel = getChannel(connfd);
  50. assert(channel != NULL);
  51. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  52. ctx->task = task;
  53. channel->onconnect = [this, &channel]() {
  54. sendRequest(channel);
  55. };
  56. channel->onread = [this, &channel](Buffer* buf) {
  57. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  58. if (ctx->task == NULL) return;
  59. const char* data = (const char*)buf->data();
  60. int len = buf->size();
  61. int nparse = ctx->parser->FeedRecvData(data, len);
  62. if (nparse != len) {
  63. ctx->errorCallback();
  64. channel->close();
  65. return;
  66. }
  67. if (ctx->parser->IsComplete()) {
  68. std::string req_connection = ctx->task->req->GetHeader("Connection");
  69. std::string resp_connection = ctx->resp->GetHeader("Connection");
  70. ctx->successCallback();
  71. if (stricmp(req_connection.c_str(), "keep-alive") == 0 &&
  72. stricmp(resp_connection.c_str(), "keep-alive") == 0) {
  73. // NOTE: add into conn_pools to reuse
  74. // hlogd("add into conn_pools");
  75. conn_pools[channel->peeraddr()].add(channel->fd());
  76. } else {
  77. channel->close();
  78. }
  79. }
  80. };
  81. channel->onclose = [this, &channel]() {
  82. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  83. // NOTE: remove from conn_pools
  84. // hlogd("remove from conn_pools");
  85. auto iter = conn_pools.find(channel->peeraddr());
  86. if (iter != conn_pools.end()) {
  87. iter->second.remove(channel->fd());
  88. }
  89. if (ctx->task && ctx->task->retry_cnt-- > 0) {
  90. // try again
  91. send(ctx->task);
  92. } else {
  93. ctx->errorCallback();
  94. }
  95. removeChannel(channel);
  96. };
  97. // timer
  98. if (timeout_ms > 0) {
  99. ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [&channel](TimerID timerID){
  100. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  101. assert(ctx->task != NULL);
  102. hlogw("%s timeout!", ctx->task->req->url.c_str());
  103. if (channel) {
  104. channel->close();
  105. }
  106. });
  107. }
  108. if (channel->isConnected()) {
  109. // sendRequest
  110. sendRequest(channel);
  111. } else {
  112. // startConnect
  113. channel->startConnect();
  114. }
  115. return 0;
  116. }
  117. // InitResponse => SubmitRequest => while(GetSendData) write => startRead
  118. int AsyncHttpClient::sendRequest(const SocketChannelPtr& channel) {
  119. HttpClientContext* ctx = (HttpClientContext*)channel->context();
  120. assert(ctx != NULL && ctx->task != NULL);
  121. if (ctx->parser == NULL) {
  122. ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)ctx->task->req->http_major));
  123. }
  124. if (ctx->resp == NULL) {
  125. ctx->resp.reset(new HttpResponse);
  126. }
  127. ctx->parser->InitResponse(ctx->resp.get());
  128. ctx->parser->SubmitRequest(ctx->task->req.get());
  129. char* data = NULL;
  130. size_t len = 0;
  131. while (ctx->parser->GetSendData(&data, &len)) {
  132. Buffer buf(data, len);
  133. channel->write(&buf);
  134. }
  135. channel->startRead();
  136. return 0;
  137. }
  138. }