|
|
@@ -7,6 +7,10 @@ namespace hv {
|
|
|
// onread => HttpParser => resp_cb
|
|
|
int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
|
|
|
const HttpRequestPtr& req = task->req;
|
|
|
+ if (req->cancel) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
// queueInLoop timeout?
|
|
|
uint64_t now_hrtime = hloop_now_hrtime(EventLoopThread::hloop());
|
|
|
int elapsed_ms = (now_hrtime - task->start_time) / 1000;
|
|
|
@@ -66,6 +70,10 @@ int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
|
|
|
channel->onread = [this, &channel](Buffer* buf) {
|
|
|
HttpClientContext* ctx = channel->getContext<HttpClientContext>();
|
|
|
if (ctx->task == NULL) return;
|
|
|
+ if (ctx->task->req->cancel) {
|
|
|
+ channel->close();
|
|
|
+ return;
|
|
|
+ }
|
|
|
const char* data = (const char*)buf->data();
|
|
|
int len = buf->size();
|
|
|
int nparse = ctx->parser->FeedRecvData(data, len);
|
|
|
@@ -113,10 +121,13 @@ int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
|
|
|
|
|
|
const HttpClientTaskPtr& task = ctx->task;
|
|
|
if (task) {
|
|
|
- if (ctx->parser && ctx->parser->IsEof()) {
|
|
|
+ if (ctx->parser &&
|
|
|
+ ctx->parser->IsEof()) {
|
|
|
ctx->successCallback();
|
|
|
}
|
|
|
- else if (task->req && task->req->retry_count-- > 0) {
|
|
|
+ else if (task->req &&
|
|
|
+ task->req->cancel == 0 &&
|
|
|
+ task->req->retry_count-- > 0) {
|
|
|
if (task->req->retry_delay > 0) {
|
|
|
// try again after delay
|
|
|
setTimeout(task->req->retry_delay, [this, task](TimerID timerID){
|
|
|
@@ -183,6 +194,10 @@ int AsyncHttpClient::sendRequest(const SocketChannelPtr& channel) {
|
|
|
char* data = NULL;
|
|
|
size_t len = 0;
|
|
|
while (ctx->parser->GetSendData(&data, &len)) {
|
|
|
+ if (req->cancel) {
|
|
|
+ channel->close();
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
// NOTE: ensure write buffer size is enough
|
|
|
if (len > (1 << 22) /* 4M */) {
|
|
|
channel->setMaxWriteBufsize(len);
|