AsyncHttpClient.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #include "AsyncHttpClient.h"
  2. namespace hv {
  3. // createsocket => startConnect =>
  4. // onconnect => sendRequest => startRead =>
  5. // onread => HttpParser => resp_cb
  6. int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
  7. const HttpRequestPtr& req = task->req;
  8. // queueInLoop timeout?
  9. uint64_t now_hrtime = hloop_now_hrtime(loop_thread.hloop());
  10. int elapsed_ms = (now_hrtime - task->start_time) / 1000;
  11. int timeout_ms = req->timeout * 1000;
  12. if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
  13. hlogw("%s queueInLoop timeout!", req->url.c_str());
  14. return -10;
  15. }
  16. req->ParseUrl();
  17. sockaddr_u peeraddr;
  18. memset(&peeraddr, 0, sizeof(peeraddr));
  19. int ret = sockaddr_set_ipport(&peeraddr, req->host.c_str(), req->port);
  20. if (ret != 0) {
  21. hloge("unknown host %s", req->host.c_str());
  22. return -20;
  23. }
  24. int connfd = -1;
  25. // first get from conn_pools
  26. char strAddr[SOCKADDR_STRLEN] = {0};
  27. SOCKADDR_STR(&peeraddr, strAddr);
  28. auto iter = conn_pools.find(strAddr);
  29. if (iter != conn_pools.end()) {
  30. // hlogd("get from conn_pools");
  31. iter->second.get(connfd);
  32. }
  33. if (connfd < 0) {
  34. // create socket
  35. connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
  36. if (connfd < 0) {
  37. perror("socket");
  38. return -30;
  39. }
  40. hio_t* connio = hio_get(loop_thread.hloop(), connfd);
  41. assert(connio != NULL);
  42. hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
  43. addChannel(connio);
  44. // https
  45. if (strcmp(req->scheme.c_str(), "https") == 0) {
  46. hio_enable_ssl(connio);
  47. }
  48. }
  49. const SocketChannelPtr& channel = getChannel(connfd);
  50. assert(channel != NULL);
  51. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  52. ctx->task = task;
  53. channel->onconnect = [&channel]() {
  54. sendRequest(channel);
  55. };
  56. channel->onread = [this, &channel](Buffer* buf) {
  57. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  58. if (ctx->task == NULL) return;
  59. const char* data = (const char*)buf->data();
  60. int len = buf->size();
  61. int nparse = ctx->parser->FeedRecvData(data, len);
  62. if (nparse != len) {
  63. ctx->errorCallback();
  64. channel->close();
  65. return;
  66. }
  67. if (ctx->parser->IsComplete()) {
  68. bool keepalive = ctx->task->req->IsKeepAlive() && ctx->resp->IsKeepAlive();
  69. ctx->successCallback();
  70. if (keepalive) {
  71. // NOTE: add into conn_pools to reuse
  72. // hlogd("add into conn_pools");
  73. conn_pools[channel->peeraddr()].add(channel->fd());
  74. } else {
  75. channel->close();
  76. }
  77. }
  78. };
  79. channel->onclose = [this, &channel]() {
  80. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  81. // NOTE: remove from conn_pools
  82. // hlogd("remove from conn_pools");
  83. auto iter = conn_pools.find(channel->peeraddr());
  84. if (iter != conn_pools.end()) {
  85. iter->second.remove(channel->fd());
  86. }
  87. const HttpClientTaskPtr& task = ctx->task;
  88. if (task && task->retry_cnt-- > 0) {
  89. if (task->retry_delay) {
  90. // try again after delay
  91. setTimeout(ctx->task->retry_delay, [this, task](TimerID timerID){
  92. hlogi("retry %s %s", http_method_str(task->req->method), task->req->url.c_str());
  93. sendInLoop(task);
  94. });
  95. } else {
  96. send(task);
  97. }
  98. } else {
  99. ctx->errorCallback();
  100. }
  101. removeChannel(channel);
  102. };
  103. // timer
  104. if (timeout_ms > 0) {
  105. ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [&channel](TimerID timerID){
  106. HttpClientContext* ctx = channel->getContext<HttpClientContext>();
  107. assert(ctx->task != NULL);
  108. hlogw("%s timeout!", ctx->task->req->url.c_str());
  109. if (channel) {
  110. channel->close();
  111. }
  112. });
  113. }
  114. if (channel->isConnected()) {
  115. // sendRequest
  116. sendRequest(channel);
  117. } else {
  118. // startConnect
  119. channel->startConnect();
  120. }
  121. return 0;
  122. }
  123. // InitResponse => SubmitRequest => while(GetSendData) write => startRead
  124. int AsyncHttpClient::sendRequest(const SocketChannelPtr& channel) {
  125. HttpClientContext* ctx = (HttpClientContext*)channel->context();
  126. assert(ctx != NULL && ctx->task != NULL);
  127. HttpRequest* req = ctx->task->req.get();
  128. HttpResponse* resp = ctx->resp.get();
  129. if (ctx->parser == NULL) {
  130. ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)ctx->task->req->http_major));
  131. }
  132. if (resp == NULL) {
  133. resp = new HttpResponse;
  134. ctx->resp.reset(resp);
  135. }
  136. if (req->head_cb) resp->head_cb = std::move(req->head_cb);
  137. if (req->body_cb) resp->body_cb = std::move(req->body_cb);
  138. if (req->chunked_cb) resp->chunked_cb = std::move(req->chunked_cb);
  139. ctx->parser->InitResponse(resp);
  140. ctx->parser->SubmitRequest(req);
  141. char* data = NULL;
  142. size_t len = 0;
  143. while (ctx->parser->GetSendData(&data, &len)) {
  144. channel->write(data, len);
  145. }
  146. channel->startRead();
  147. return 0;
  148. }
  149. }