| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- #include "AsyncHttpClient.h"
- namespace hv {
- int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
- const HttpRequestPtr& req = task->req;
- // queueInLoop timeout?
- uint64_t now_hrtime = hloop_now_hrtime(loop_thread.hloop());
- int elapsed_ms = (now_hrtime - task->start_time) / 1000;
- int timeout_ms = req->timeout * 1000;
- if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
- hlogw("%s queueInLoop timeout!", req->url.c_str());
- return -10;
- }
- req->ParseUrl();
- sockaddr_u peeraddr;
- memset(&peeraddr, 0, sizeof(peeraddr));
- int ret = sockaddr_set_ipport(&peeraddr, req->host.c_str(), req->port);
- if (ret != 0) {
- hloge("unknown host %s", req->host.c_str());
- return -20;
- }
- int connfd = -1;
- hio_t* connio = NULL;
- HttpClientContextPtr ctx = NULL;
- // first get from conn_pools
- char strAddr[SOCKADDR_STRLEN] = {0};
- SOCKADDR_STR(&peeraddr, strAddr);
- auto iter = conn_pools.find(strAddr);
- if (iter != conn_pools.end()) {
- if (iter->second.get(connfd)) {
- // hlogd("get from conn_pools");
- ctx = getContext(connfd);
- }
- }
- if (connfd < 0) {
- // create socket
- connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
- if (connfd < 0) {
- perror("socket");
- return -30;
- }
- connio = hio_get(loop_thread.hloop(), connfd);
- assert(connio != NULL);
- hio_set_peeraddr(connio, &peeraddr.sa, sockaddr_len(&peeraddr));
- // https
- if (req->https) {
- hio_enable_ssl(connio);
- }
- }
- if (ctx == NULL) {
- // new HttpClientContext
- ctx.reset(new HttpClientContext);
- ctx->channel.reset(new SocketChannel(connio));
- addContext(ctx);
- }
- ctx->req = req;
- ctx->cb = task->cb;
- ctx->channel->onread = [this, ctx](Buffer* buf) {
- const char* data = (const char*)buf->data();
- int len = buf->size();
- int nparse = ctx->parser->FeedRecvData(data, len);
- if (nparse != len) {
- ctx->errorCallback();
- ctx->channel->close();
- return;
- }
- if (ctx->parser->IsComplete()) {
- std::string req_connection = ctx->req->GetHeader("Connection");
- std::string resp_connection = ctx->resp->GetHeader("Connection");
- ctx->successCallback();
- if (stricmp(req_connection.c_str(), "keep-alive") == 0 &&
- stricmp(resp_connection.c_str(), "keep-alive") == 0) {
- // add into conn_pools to reuse
- // hlogd("add into conn_pools");
- conn_pools[ctx->channel->peeraddr()].add(ctx->channel->fd());
- } else {
- ctx->channel->close();
- }
- }
- };
- ctx->channel->onclose = [this, ctx, task]() {
- ctx->channel->status = SocketChannel::CLOSED;
- removeContext(ctx);
- if (task->retry_cnt-- > 0) {
- // try again
- send(task);
- } else {
- ctx->errorCallback();
- }
- };
- // timer
- if (timeout_ms > 0) {
- ctx->timerID = setTimeout(timeout_ms - elapsed_ms, [ctx](TimerID timerID){
- hlogw("%s timeout!", ctx->req->url.c_str());
- if (ctx->channel) {
- ctx->channel->close();
- }
- });
- }
- if (ctx->channel->isConnected()) {
- // sendRequest
- sendRequest(ctx);
- } else {
- // startConnect
- hevent_set_userdata(connio, this);
- hio_setcb_connect(connio, onconnect);
- hio_connect(connio);
- }
- return 0;
- }
- void AsyncHttpClient::onconnect(hio_t* io) {
- AsyncHttpClient* client = (AsyncHttpClient*)hevent_userdata(io);
- HttpClientContextPtr ctx = client->getContext(hio_fd(io));
- assert(ctx != NULL && ctx->req != NULL && ctx->channel != NULL);
- ctx->channel->status = SocketChannel::CONNECTED;
- client->sendRequest(ctx);
- ctx->channel->startRead();
- }
- int AsyncHttpClient::sendRequest(const HttpClientContextPtr ctx) {
- assert(ctx != NULL && ctx->req != NULL && ctx->channel != NULL);
- SocketChannelPtr channel = ctx->channel;
- if (ctx->parser == NULL) {
- ctx->parser.reset(HttpParser::New(HTTP_CLIENT, (http_version)ctx->req->http_major));
- }
- if (ctx->resp == NULL) {
- ctx->resp.reset(new HttpResponse);
- }
- ctx->parser->InitResponse(ctx->resp.get());
- ctx->parser->SubmitRequest(ctx->req.get());
- char* data = NULL;
- size_t len = 0;
- while (ctx->parser->GetSendData(&data, &len)) {
- Buffer buf(data, len);
- channel->write(&buf);
- }
- return 0;
- }
- }
|