|
@@ -2,10 +2,11 @@
|
|
|
|
|
|
|
|
namespace hv {
|
|
namespace hv {
|
|
|
|
|
|
|
|
-int AsyncHttpClient::sendInLoopImpl(const HttpRequestPtr& req, HttpResponseCallback resp_cb, uint64_t start_hrtime) {
|
|
|
|
|
|
|
+int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
|
|
|
|
|
+ const HttpRequestPtr& req = task->req;
|
|
|
// queueInLoop timeout?
|
|
// queueInLoop timeout?
|
|
|
uint64_t now_hrtime = hloop_now_hrtime(loop_thread.hloop());
|
|
uint64_t now_hrtime = hloop_now_hrtime(loop_thread.hloop());
|
|
|
- int elapsed_ms = (now_hrtime - start_hrtime) / 1000;
|
|
|
|
|
|
|
+ int elapsed_ms = (now_hrtime - task->start_time) / 1000;
|
|
|
int timeout_ms = req->timeout * 1000;
|
|
int timeout_ms = req->timeout * 1000;
|
|
|
if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
|
|
if (timeout_ms > 0 && elapsed_ms >= timeout_ms) {
|
|
|
hlogw("%s queueInLoop timeout!", req->url.c_str());
|
|
hlogw("%s queueInLoop timeout!", req->url.c_str());
|
|
@@ -34,7 +35,7 @@ int AsyncHttpClient::sendInLoopImpl(const HttpRequestPtr& req, HttpResponseCallb
|
|
|
// hlogd("get from conn_pools");
|
|
// hlogd("get from conn_pools");
|
|
|
ctx = getContext(connfd);
|
|
ctx = getContext(connfd);
|
|
|
ctx->req = req;
|
|
ctx->req = req;
|
|
|
- ctx->cb = resp_cb;
|
|
|
|
|
|
|
+ ctx->cb = task->cb;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -58,7 +59,7 @@ int AsyncHttpClient::sendInLoopImpl(const HttpRequestPtr& req, HttpResponseCallb
|
|
|
// new HttpClientContext
|
|
// new HttpClientContext
|
|
|
ctx.reset(new HttpClientContext);
|
|
ctx.reset(new HttpClientContext);
|
|
|
ctx->req = req;
|
|
ctx->req = req;
|
|
|
- ctx->cb = resp_cb;
|
|
|
|
|
|
|
+ ctx->cb = task->cb;
|
|
|
ctx->channel.reset(new SocketChannel(connio));
|
|
ctx->channel.reset(new SocketChannel(connio));
|
|
|
ctx->channel->onread = [this, ctx](Buffer* buf) {
|
|
ctx->channel->onread = [this, ctx](Buffer* buf) {
|
|
|
const char* data = (const char*)buf->data();
|
|
const char* data = (const char*)buf->data();
|
|
@@ -83,10 +84,15 @@ int AsyncHttpClient::sendInLoopImpl(const HttpRequestPtr& req, HttpResponseCallb
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
- ctx->channel->onclose = [this, ctx]() {
|
|
|
|
|
|
|
+ ctx->channel->onclose = [this, ctx, task]() {
|
|
|
ctx->channel->status = SocketChannel::CLOSED;
|
|
ctx->channel->status = SocketChannel::CLOSED;
|
|
|
removeContext(ctx);
|
|
removeContext(ctx);
|
|
|
- ctx->errorCallback();
|
|
|
|
|
|
|
+ if (task->retry_cnt-- > 0) {
|
|
|
|
|
+ // try again
|
|
|
|
|
+ send(task);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ ctx->errorCallback();
|
|
|
|
|
+ }
|
|
|
};
|
|
};
|
|
|
addContext(ctx);
|
|
addContext(ctx);
|
|
|
}
|
|
}
|