AsyncHttpClient.cpp 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #include "AsyncHttpClient.h"
  2. namespace hv {
  3. int AsyncHttpClient::send(const HttpRequestPtr& req, HttpResponseCallback resp_cb) {
  4. hloop_t* loop = EventLoopThread::hloop();
  5. if (loop == NULL) return -1;
  6. auto task = std::make_shared<HttpClientTask>();
  7. task->req = req;
  8. task->cb = std::move(resp_cb);
  9. task->start_time = hloop_now_hrtime(loop);
  10. if (req->retry_count > 0 && req->retry_delay > 0) {
  11. req->retry_count = MIN(req->retry_count, req->timeout * 1000 / req->retry_delay - 1);
  12. }
  13. return send(task);
  14. }
  15. // createsocket => startConnect =>
  16. // onconnect => sendRequest => startRead =>
  17. // onread => HttpParser => resp_cb
  18. int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
  19. const HttpRequestPtr& req = task->req;
  20. if (req->cancel) {
  21. return -1;
  22. }
  23. // queueInLoop timeout?
  24. uint64_t now_hrtime = hloop_now_hrtime(EventLoopThread::hloop());
  25. int elapsed_ms = (now_hrtime - task->start_time) / 1000;
  26. int timeout_ms = req->timeout * 1000;
  27. if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
  28. hlogw("%s queueInLoop timeout!", req->url.c_str());
  29. return -10;
  30. }
  31. req->ParseUrl();
  32. sockaddr_u peeraddr;
  33. memset(&peeraddr, 0, sizeof(peeraddr));
  34. const char* host = req->host.c_str();
  35. int ret = sockaddr_set_ipport(&peeraddr, host, req->port);
  36. if (ret != 0) {
  37. hloge("unknown host %s", host);
  38. return -20;
  39. }
  40. int connfd = -1;
  41. // first get from conn_pools
  42. char strAddr[SOCKADDR_STRLEN] = {0};
  43. SOCKADDR_STR(&peeraddr, strAddr);
  44. auto iter = conn_pools.find(strAddr);
  45. if (iter != conn_pools.end()) {
  46. // hlogd("get from conn_pools");
  47. iter->second.get(connfd);
  48. }
  49. if (connfd < 0) {
  50. // create socket
  51. connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
  52. if (connfd < 0) {
  53. perror("socket");
  54. return -30;
  55. }
  56. hio_t* connio = hio_get(EventLoopThread::hloop(), connfd);
  57. assert(connio != NULL);
  58. hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
  59. addChannel(connio);
  60. // https
  61. if (req->IsHttps() && !req->IsProxy()) {
  62. hio_enable_ssl(connio);
  63. if (!is_ipaddr(host)) {
  64. hio_set_hostname(connio, host);
  65. }
  66. }
  67. }
  68. const SocketChannelPtr& channel = getChannel(connfd);
  69. assert(channel != NULL);
  70. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  71. ctx->task = task;
  72. channel->onconnect = [&channel]() {
  73. sendRequest(channel);
  74. };
  75. channel->onread = [this, &channel](Buffer* buf) {
  76. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  77. if (ctx->task == NULL) return;
  78. if (ctx->task->req->cancel) {
  79. channel->close();
  80. return;
  81. }
  82. const char* data = (const char*)buf->data();
  83. int len = buf->size();
  84. int nparse = ctx->parser->FeedRecvData(data, len);
  85. if (nparse != len) {
  86. ctx->errorCallback();
  87. channel->close();
  88. return;
  89. }
  90. if (ctx->parser->IsComplete()) {
  91. auto& req = ctx->task->req;
  92. auto& resp = ctx->resp;
  93. bool keepalive = req->IsKeepAlive() && resp->IsKeepAlive();
  94. if (req->redirect && HTTP_STATUS_IS_REDIRECT(resp->status_code)) {
  95. std::string location = resp->headers["Location"];
  96. if (!location.empty()) {
  97. hlogi("redirect %s => %s", req->url.c_str(), location.c_str());
  98. req->url = location;
  99. req->ParseUrl();
  100. req->headers["Host"] = req->host;
  101. resp->Reset();
  102. send(ctx->task);
  103. // NOTE: detatch from original channel->context
  104. ctx->cancelTask();
  105. }
  106. } else {
  107. ctx->successCallback();
  108. }
  109. if (keepalive) {
  110. // NOTE: add into conn_pools to reuse
  111. // hlogd("add into conn_pools");
  112. conn_pools[channel->peeraddr()].add(channel->fd());
  113. } else {
  114. channel->close();
  115. }
  116. }
  117. };
  118. channel->onclose = [this, &channel]() {
  119. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  120. // NOTE: remove from conn_pools
  121. // hlogd("remove from conn_pools");
  122. auto iter = conn_pools.find(channel->peeraddr());
  123. if (iter != conn_pools.end()) {
  124. iter->second.remove(channel->fd());
  125. }
  126. const HttpClientTaskPtr& task = ctx->task;
  127. if (task) {
  128. if (ctx->parser &&
  129. ctx->parser->IsEof()) {
  130. ctx->successCallback();
  131. }
  132. else if (task->req &&
  133. task->req->cancel == 0 &&
  134. task->req->retry_count-- > 0) {
  135. if (task->req->retry_delay > 0) {
  136. // try again after delay
  137. setTimeout(task->req->retry_delay, [this, task](TimerID timerID){
  138. hlogi("retry %s %s", http_method_str(task->req->method), task->req->url.c_str());
  139. sendInLoop(task);
  140. });
  141. } else {
  142. send(task);
  143. }
  144. }
  145. else {
  146. ctx->errorCallback();
  147. }
  148. }
  149. removeChannel(channel);
  150. };
  151. // timer
  152. if (timeout_ms > 0) {
  153. ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [&channel](TimerID timerID){
  154. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  155. if (ctx && ctx->task) {
  156. hlogw("%s timeout!", ctx->task->req->url.c_str());
  157. }
  158. if (channel) {
  159. channel->close();
  160. }
  161. });
  162. }
  163. if (channel->isConnected()) {
  164. // sendRequest
  165. sendRequest(channel);
  166. } else {
  167. // startConnect
  168. if (req->connect_timeout > 0) {
  169. channel->setConnectTimeout(req->connect_timeout * 1000);
  170. }
  171. channel->startConnect();
  172. }
  173. return 0;
  174. }
  175. // InitResponse => SubmitRequest => while(GetSendData) write => startRead
  176. int AsyncHttpClient::sendRequest(const SocketChannelPtr& channel) {
  177. HttpClientContext* ctx = (HttpClientContext*)channel->context();
  178. assert(ctx != NULL && ctx->task != NULL);
  179. if (ctx->resp == NULL) {
  180. ctx->resp = std::make_shared<HttpResponse>();
  181. }
  182. HttpRequest* req = ctx->task->req.get();
  183. HttpResponse* resp = ctx->resp.get();
  184. assert(req != NULL && resp != NULL);
  185. if (req->http_cb) resp->http_cb = std::move(req->http_cb);
  186. if (ctx->parser == NULL) {
  187. ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)req->http_major));
  188. }
  189. ctx->parser->InitResponse(resp);
  190. ctx->parser->SubmitRequest(req);
  191. char* data = NULL;
  192. size_t len = 0;
  193. while (ctx->parser->GetSendData(&data, &len)) {
  194. if (req->cancel) {
  195. channel->close();
  196. return -1;
  197. }
  198. // NOTE: ensure write buffer size is enough
  199. if (len > (1 << 22) /* 4M */) {
  200. channel->setMaxWriteBufsize(len);
  201. }
  202. channel->write(data, len);
  203. }
  204. channel->startRead();
  205. return 0;
  206. }
  207. }