1
0

AsyncHttpClient.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #include "AsyncHttpClient.h"
  2. namespace hv {
  3. int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
  4. const HttpRequestPtr& req = task->req;
  5. // queueInLoop timeout?
  6. uint64_t now_hrtime = hloop_now_hrtime(loop_thread.hloop());
  7. int elapsed_ms = (now_hrtime - task->start_time) / 1000;
  8. int timeout_ms = req->timeout * 1000;
  9. if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
  10. hlogw("%s queueInLoop timeout!", req->url.c_str());
  11. return -10;
  12. }
  13. req->ParseUrl();
  14. sockaddr_u peeraddr;
  15. memset(&peeraddr, 0, sizeof(peeraddr));
  16. int ret = sockaddr_set_ipport(&peeraddr, req->host.c_str(), req->port);
  17. if (ret != 0) {
  18. hloge("unknown host %s", req->host.c_str());
  19. return -20;
  20. }
  21. int connfd = -1;
  22. hio_t* connio = NULL;
  23. HttpClientContextPtr ctx = NULL;
  24. // first get from conn_pools
  25. char strAddr[SOCKADDR_STRLEN] = {0};
  26. SOCKADDR_STR(&peeraddr, strAddr);
  27. auto iter = conn_pools.find(strAddr);
  28. if (iter != conn_pools.end()) {
  29. if (iter->second.get(connfd)) {
  30. // hlogd("get from conn_pools");
  31. ctx = getContext(connfd);
  32. ctx->req = req;
  33. ctx->cb = task->cb;
  34. }
  35. }
  36. if (connfd < 0) {
  37. // create socket
  38. connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
  39. if (connfd < 0) {
  40. perror("socket");
  41. return -30;
  42. }
  43. connio = hio_get(loop_thread.hloop(), connfd);
  44. assert(connio != NULL);
  45. hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
  46. // https
  47. if (req->https) {
  48. hio_enable_ssl(connio);
  49. }
  50. }
  51. if (ctx == NULL) {
  52. // new HttpClientContext
  53. ctx.reset(new HttpClientContext);
  54. ctx->req = req;
  55. ctx->cb = task->cb;
  56. ctx->channel.reset(new SocketChannel(connio));
  57. ctx->channel->onread = [this, ctx](Buffer* buf) {
  58. const char* data = (const char*)buf->data();
  59. int len = buf->size();
  60. int nparse = ctx->parser->FeedRecvData(data, len);
  61. if (nparse != len) {
  62. ctx->errorCallback();
  63. ctx->channel->close();
  64. return;
  65. }
  66. if (ctx->parser->IsComplete()) {
  67. std::string req_connection = ctx->req->GetHeader("Connection");
  68. std::string resp_connection = ctx->resp->GetHeader("Connection");
  69. ctx->successCallback();
  70. if (stricmp(req_connection.c_str(), "keep-alive") == 0 &&
  71. stricmp(resp_connection.c_str(), "keep-alive") == 0) {
  72. // add into conn_pools to reuse
  73. // hlogd("add into conn_pools");
  74. conn_pools[ctx->channel->peeraddr()].add(ctx->channel->fd());
  75. } else {
  76. ctx->channel->close();
  77. }
  78. }
  79. };
  80. ctx->channel->onclose = [this, ctx, task]() {
  81. ctx->channel->status = SocketChannel::CLOSED;
  82. removeContext(ctx);
  83. if (task->retry_cnt-- > 0) {
  84. // try again
  85. send(task);
  86. } else {
  87. ctx->errorCallback();
  88. }
  89. };
  90. addContext(ctx);
  91. }
  92. // timer
  93. if (timeout_ms > 0) {
  94. ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [ctx](TimerID timerID){
  95. hlogw("%s timeout!", ctx->req->url.c_str());
  96. if (ctx->channel) {
  97. ctx->channel->close();
  98. }
  99. });
  100. }
  101. if (ctx->channel->isConnected()) {
  102. // sendRequest
  103. sendRequest(ctx);
  104. } else {
  105. // startConnect
  106. hevent_set_userdata(connio, this);
  107. hio_setcb_connect(connio, onconnect);
  108. hio_connect(connio);
  109. }
  110. return 0;
  111. }
  112. void AsyncHttpClient::onconnect(hio_t* io) {
  113. AsyncHttpClient* client = (AsyncHttpClient*)hevent_userdata(io);
  114. HttpClientContextPtr ctx = client->getContext(hio_fd(io));
  115. assert(ctx != NULL && ctx->req != NULL && ctx->channel != NULL);
  116. ctx->channel->status = SocketChannel::CONNECTED;
  117. client->sendRequest(ctx);
  118. ctx->channel->startRead();
  119. }
  120. int AsyncHttpClient::sendRequest(const HttpClientContextPtr ctx) {
  121. assert(ctx != NULL && ctx->req != NULL && ctx->channel != NULL);
  122. SocketChannelPtr channel = ctx->channel;
  123. if (ctx->parser == NULL) {
  124. ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)ctx->req->http_major));
  125. }
  126. if (ctx->resp == NULL) {
  127. ctx->resp.reset(new HttpResponse);
  128. }
  129. ctx->parser->InitResponse(ctx->resp.get());
  130. ctx->parser->SubmitRequest(ctx->req.get());
  131. char* data = NULL;
  132. size_t len = 0;
  133. while (ctx->parser->GetSendData(&data, &len)) {
  134. Buffer buf(data, len);
  135. channel->write(&buf);
  136. }
  137. return 0;
  138. }
  139. }