AsyncHttpClient.cpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #include "AsyncHttpClient.h"
  2. namespace hv {
  3. // createsocket => startConnect =>
  4. // onconnect => sendRequest => startRead =>
  5. // onread => HttpParser => resp_cb
  6. int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
  7. const HttpRequestPtr& req = task->req;
  8. // queueInLoop timeout?
  9. uint64_t now_hrtime = hloop_now_hrtime(EventLoopThread::hloop());
  10. int elapsed_ms = (now_hrtime - task->start_time) / 1000;
  11. int timeout_ms = req->timeout * 1000;
  12. if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
  13. hlogw("%s queueInLoop timeout!", req->url.c_str());
  14. return -10;
  15. }
  16. req->ParseUrl();
  17. sockaddr_u peeraddr;
  18. memset(&peeraddr, 0, sizeof(peeraddr));
  19. const char* host = req->host.c_str();
  20. int ret = sockaddr_set_ipport(&peeraddr, host, req->port);
  21. if (ret != 0) {
  22. hloge("unknown host %s", host);
  23. return -20;
  24. }
  25. int connfd = -1;
  26. // first get from conn_pools
  27. char strAddr[SOCKADDR_STRLEN] = {0};
  28. SOCKADDR_STR(&peeraddr, strAddr);
  29. auto iter = conn_pools.find(strAddr);
  30. if (iter != conn_pools.end()) {
  31. // hlogd("get from conn_pools");
  32. iter->second.get(connfd);
  33. }
  34. if (connfd < 0) {
  35. // create socket
  36. connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
  37. if (connfd < 0) {
  38. perror("socket");
  39. return -30;
  40. }
  41. hio_t* connio = hio_get(EventLoopThread::hloop(), connfd);
  42. assert(connio != NULL);
  43. hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
  44. addChannel(connio);
  45. // https
  46. if (req->IsHttps() && !req->IsProxy()) {
  47. hio_enable_ssl(connio);
  48. if (!is_ipaddr(host)) {
  49. hio_set_hostname(connio, host);
  50. }
  51. }
  52. }
  53. const SocketChannelPtr& channel = getChannel(connfd);
  54. assert(channel != NULL);
  55. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  56. ctx->task = task;
  57. channel->onconnect = [&channel]() {
  58. sendRequest(channel);
  59. };
  60. channel->onread = [this, &channel](Buffer* buf) {
  61. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  62. if (ctx->task == NULL) return;
  63. const char* data = (const char*)buf->data();
  64. int len = buf->size();
  65. int nparse = ctx->parser->FeedRecvData(data, len);
  66. if (nparse != len) {
  67. ctx->errorCallback();
  68. channel->close();
  69. return;
  70. }
  71. if (ctx->parser->IsComplete()) {
  72. auto& req = ctx->task->req;
  73. auto& resp = ctx->resp;
  74. bool keepalive = req->IsKeepAlive() && resp->IsKeepAlive();
  75. if (req->redirect && HTTP_STATUS_IS_REDIRECT(resp->status_code)) {
  76. std::string location = resp->headers["Location"];
  77. if (!location.empty()) {
  78. hlogi("redirect %s => %s", req->url.c_str(), location.c_str());
  79. req->url = location;
  80. req->ParseUrl();
  81. req->headers["Host"] = req->host;
  82. resp->Reset();
  83. send(ctx->task);
  84. }
  85. } else {
  86. ctx->successCallback();
  87. }
  88. if (keepalive) {
  89. // NOTE: add into conn_pools to reuse
  90. // hlogd("add into conn_pools");
  91. conn_pools[channel->peeraddr()].add(channel->fd());
  92. } else {
  93. channel->close();
  94. }
  95. }
  96. };
  97. channel->onclose = [this, &channel]() {
  98. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  99. // NOTE: remove from conn_pools
  100. // hlogd("remove from conn_pools");
  101. auto iter = conn_pools.find(channel->peeraddr());
  102. if (iter != conn_pools.end()) {
  103. iter->second.remove(channel->fd());
  104. }
  105. const HttpClientTaskPtr& task = ctx->task;
  106. if (task && task->req && task->req->retry_count-- > 0) {
  107. if (task->req->retry_delay > 0) {
  108. // try again after delay
  109. setTimeout(task->req->retry_delay, [this, task](TimerID timerID){
  110. hlogi("retry %s %s", http_method_str(task->req->method), task->req->url.c_str());
  111. sendInLoop(task);
  112. });
  113. } else {
  114. send(task);
  115. }
  116. } else {
  117. ctx->errorCallback();
  118. }
  119. removeChannel(channel);
  120. };
  121. // timer
  122. if (timeout_ms > 0) {
  123. ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [&channel](TimerID timerID){
  124. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  125. assert(ctx->task != NULL);
  126. hlogw("%s timeout!", ctx->task->req->url.c_str());
  127. if (channel) {
  128. channel->close();
  129. }
  130. });
  131. }
  132. if (channel->isConnected()) {
  133. // sendRequest
  134. sendRequest(channel);
  135. } else {
  136. // startConnect
  137. if (req->connect_timeout > 0) {
  138. channel->setConnectTimeout(req->connect_timeout * 1000);
  139. }
  140. channel->startConnect();
  141. }
  142. return 0;
  143. }
  144. // InitResponse => SubmitRequest => while(GetSendData) write => startRead
  145. int AsyncHttpClient::sendRequest(const SocketChannelPtr& channel) {
  146. HttpClientContext* ctx = (HttpClientContext*)channel->context();
  147. assert(ctx != NULL && ctx->task != NULL);
  148. HttpRequest* req = ctx->task->req.get();
  149. HttpResponse* resp = ctx->resp.get();
  150. if (ctx->parser == NULL) {
  151. ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)req->http_major));
  152. }
  153. if (resp == NULL) {
  154. resp = new HttpResponse;
  155. ctx->resp.reset(resp);
  156. }
  157. if (req->http_cb) resp->http_cb = std::move(req->http_cb);
  158. ctx->parser->InitResponse(resp);
  159. ctx->parser->SubmitRequest(req);
  160. char* data = NULL;
  161. size_t len = 0;
  162. while (ctx->parser->GetSendData(&data, &len)) {
  163. // NOTE: ensure write buffer size is enough
  164. if (len > (1 << 22) /* 4M */) {
  165. channel->setMaxWriteBufsize(len);
  166. }
  167. channel->write(data, len);
  168. }
  169. channel->startRead();
  170. return 0;
  171. }
  172. }