AsyncHttpClient.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #include "AsyncHttpClient.h"
  2. namespace hv {
  3. // createsocket => startConnect =>
  4. // onconnect => sendRequest => startRead =>
  5. // onread => HttpParser => resp_cb
  6. int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
  7. const HttpRequestPtr& req = task->req;
  8. if (req->cancel) {
  9. return -1;
  10. }
  11. // queueInLoop timeout?
  12. uint64_t now_hrtime = hloop_now_hrtime(EventLoopThread::hloop());
  13. int elapsed_ms = (now_hrtime - task->start_time) / 1000;
  14. int timeout_ms = req->timeout * 1000;
  15. if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
  16. hlogw("%s queueInLoop timeout!", req->url.c_str());
  17. return -10;
  18. }
  19. req->ParseUrl();
  20. sockaddr_u peeraddr;
  21. memset(&peeraddr, 0, sizeof(peeraddr));
  22. const char* host = req->host.c_str();
  23. int ret = sockaddr_set_ipport(&peeraddr, host, req->port);
  24. if (ret != 0) {
  25. hloge("unknown host %s", host);
  26. return -20;
  27. }
  28. int connfd = -1;
  29. // first get from conn_pools
  30. char strAddr[SOCKADDR_STRLEN] = {0};
  31. SOCKADDR_STR(&peeraddr, strAddr);
  32. auto iter = conn_pools.find(strAddr);
  33. if (iter != conn_pools.end()) {
  34. // hlogd("get from conn_pools");
  35. iter->second.get(connfd);
  36. }
  37. if (connfd < 0) {
  38. // create socket
  39. connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
  40. if (connfd < 0) {
  41. perror("socket");
  42. return -30;
  43. }
  44. hio_t* connio = hio_get(EventLoopThread::hloop(), connfd);
  45. assert(connio != NULL);
  46. hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
  47. addChannel(connio);
  48. // https
  49. if (req->IsHttps() && !req->IsProxy()) {
  50. hio_enable_ssl(connio);
  51. if (!is_ipaddr(host)) {
  52. hio_set_hostname(connio, host);
  53. }
  54. }
  55. }
  56. const SocketChannelPtr& channel = getChannel(connfd);
  57. assert(channel != NULL);
  58. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  59. ctx->task = task;
  60. channel->onconnect = [&channel]() {
  61. sendRequest(channel);
  62. };
  63. channel->onread = [this, &channel](Buffer* buf) {
  64. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  65. if (ctx->task == NULL) return;
  66. if (ctx->task->req->cancel) {
  67. channel->close();
  68. return;
  69. }
  70. const char* data = (const char*)buf->data();
  71. int len = buf->size();
  72. int nparse = ctx->parser->FeedRecvData(data, len);
  73. if (nparse != len) {
  74. ctx->errorCallback();
  75. channel->close();
  76. return;
  77. }
  78. if (ctx->parser->IsComplete()) {
  79. auto& req = ctx->task->req;
  80. auto& resp = ctx->resp;
  81. bool keepalive = req->IsKeepAlive() && resp->IsKeepAlive();
  82. if (req->redirect && HTTP_STATUS_IS_REDIRECT(resp->status_code)) {
  83. std::string location = resp->headers["Location"];
  84. if (!location.empty()) {
  85. hlogi("redirect %s => %s", req->url.c_str(), location.c_str());
  86. req->url = location;
  87. req->ParseUrl();
  88. req->headers["Host"] = req->host;
  89. resp->Reset();
  90. send(ctx->task);
  91. // NOTE: detatch from original channel->context
  92. ctx->cancelTask();
  93. }
  94. } else {
  95. ctx->successCallback();
  96. }
  97. if (keepalive) {
  98. // NOTE: add into conn_pools to reuse
  99. // hlogd("add into conn_pools");
  100. conn_pools[channel->peeraddr()].add(channel->fd());
  101. } else {
  102. channel->close();
  103. }
  104. }
  105. };
  106. channel->onclose = [this, &channel]() {
  107. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  108. // NOTE: remove from conn_pools
  109. // hlogd("remove from conn_pools");
  110. auto iter = conn_pools.find(channel->peeraddr());
  111. if (iter != conn_pools.end()) {
  112. iter->second.remove(channel->fd());
  113. }
  114. const HttpClientTaskPtr& task = ctx->task;
  115. if (task) {
  116. if (ctx->parser &&
  117. ctx->parser->IsEof()) {
  118. ctx->successCallback();
  119. }
  120. else if (task->req &&
  121. task->req->cancel == 0 &&
  122. task->req->retry_count-- > 0) {
  123. if (task->req->retry_delay > 0) {
  124. // try again after delay
  125. setTimeout(task->req->retry_delay, [this, task](TimerID timerID){
  126. hlogi("retry %s %s", http_method_str(task->req->method), task->req->url.c_str());
  127. sendInLoop(task);
  128. });
  129. } else {
  130. send(task);
  131. }
  132. }
  133. else {
  134. ctx->errorCallback();
  135. }
  136. }
  137. removeChannel(channel);
  138. };
  139. // timer
  140. if (timeout_ms > 0) {
  141. ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [&channel](TimerID timerID){
  142. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  143. if (ctx && ctx->task) {
  144. hlogw("%s timeout!", ctx->task->req->url.c_str());
  145. }
  146. if (channel) {
  147. channel->close();
  148. }
  149. });
  150. }
  151. if (channel->isConnected()) {
  152. // sendRequest
  153. sendRequest(channel);
  154. } else {
  155. // startConnect
  156. if (req->connect_timeout > 0) {
  157. channel->setConnectTimeout(req->connect_timeout * 1000);
  158. }
  159. channel->startConnect();
  160. }
  161. return 0;
  162. }
  163. // InitResponse => SubmitRequest => while(GetSendData) write => startRead
  164. int AsyncHttpClient::sendRequest(const SocketChannelPtr& channel) {
  165. HttpClientContext* ctx = (HttpClientContext*)channel->context();
  166. assert(ctx != NULL && ctx->task != NULL);
  167. if (ctx->resp == NULL) {
  168. ctx->resp = std::make_shared<HttpResponse>();
  169. }
  170. HttpRequest* req = ctx->task->req.get();
  171. HttpResponse* resp = ctx->resp.get();
  172. assert(req != NULL && resp != NULL);
  173. if (req->http_cb) resp->http_cb = std::move(req->http_cb);
  174. if (ctx->parser == NULL) {
  175. ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)req->http_major));
  176. }
  177. ctx->parser->InitResponse(resp);
  178. ctx->parser->SubmitRequest(req);
  179. char* data = NULL;
  180. size_t len = 0;
  181. while (ctx->parser->GetSendData(&data, &len)) {
  182. if (req->cancel) {
  183. channel->close();
  184. return -1;
  185. }
  186. // NOTE: ensure write buffer size is enough
  187. if (len > (1 << 22) /* 4M */) {
  188. channel->setMaxWriteBufsize(len);
  189. }
  190. channel->write(data, len);
  191. }
  192. channel->startRead();
  193. return 0;
  194. }
  195. }