AsyncHttpClient.h 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. #ifndef HV_ASYNC_HTTP_CLIENT_H_
  2. #define HV_ASYNC_HTTP_CLIENT_H_
  3. #include <list>
  4. #include "EventLoopThread.h"
  5. #include "Channel.h"
  6. #include "HttpMessage.h"
  7. #include "HttpParser.h"
  8. #define DEFAULT_FAIL_RETRY_COUNT 3
  9. #define DEFAULT_FAIL_RETRY_DELAY 1000 // ms
  10. // async => keepalive => connect_pool
  11. namespace hv {
  12. template<typename Conn>
  13. class ConnPool {
  14. public:
  15. int size() {
  16. return conns_.size();
  17. }
  18. bool get(Conn& conn) {
  19. if (conns_.empty()) return false;
  20. conn = conns_.front();
  21. conns_.pop_front();
  22. return true;
  23. }
  24. bool add(const Conn& conn) {
  25. conns_.push_back(conn);
  26. return true;
  27. }
  28. bool remove(const Conn& conn) {
  29. auto iter = conns_.begin();
  30. while (iter != conns_.end()) {
  31. if (*iter == conn) {
  32. iter = conns_.erase(iter);
  33. return true;
  34. } else {
  35. ++iter;
  36. }
  37. }
  38. return false;
  39. }
  40. private:
  41. std::list<Conn> conns_;
  42. };
  43. struct HttpClientTask {
  44. HttpRequestPtr req;
  45. HttpResponseCallback cb;
  46. uint64_t start_time;
  47. int retry_cnt;
  48. int retry_delay;
  49. };
  50. typedef std::shared_ptr<HttpClientTask> HttpClientTaskPtr;
  51. struct HttpClientContext {
  52. HttpClientTaskPtr task;
  53. HttpResponsePtr resp;
  54. HttpParserPtr parser;
  55. TimerID timerID;
  56. HttpClientContext() {
  57. timerID = INVALID_TIMER_ID;
  58. }
  59. ~HttpClientContext() {
  60. if (timerID != INVALID_TIMER_ID) {
  61. killTimer(timerID);
  62. timerID = INVALID_TIMER_ID;
  63. }
  64. }
  65. void callback() {
  66. if (timerID != INVALID_TIMER_ID) {
  67. killTimer(timerID);
  68. timerID = INVALID_TIMER_ID;
  69. }
  70. if (task && task->cb) {
  71. task->cb(resp);
  72. }
  73. // NOTE: task done
  74. task = NULL;
  75. }
  76. void successCallback() {
  77. callback();
  78. resp = NULL;
  79. }
  80. void errorCallback() {
  81. resp = NULL;
  82. callback();
  83. }
  84. };
  85. class AsyncHttpClient {
  86. public:
  87. AsyncHttpClient() {
  88. loop_thread.start(true);
  89. }
  90. ~AsyncHttpClient() {
  91. // NOTE: ~EventLoopThread will stop and join
  92. // loop_thread.stop(true);
  93. }
  94. // thread-safe
  95. int send(const HttpRequestPtr& req, HttpResponseCallback resp_cb) {
  96. HttpClientTaskPtr task(new HttpClientTask);
  97. task->req = req;
  98. task->cb = std::move(resp_cb);
  99. task->start_time = hloop_now_hrtime(loop_thread.hloop());
  100. task->retry_delay = DEFAULT_FAIL_RETRY_DELAY;
  101. task->retry_cnt = MIN(DEFAULT_FAIL_RETRY_COUNT, req->timeout * 1000 / task->retry_delay - 1);
  102. return send(task);
  103. }
  104. int send(const HttpClientTaskPtr& task) {
  105. loop_thread.loop()->queueInLoop(std::bind(&AsyncHttpClient::sendInLoop, this, task));
  106. return 0;
  107. }
  108. protected:
  109. void sendInLoop(HttpClientTaskPtr task) {
  110. int err = doTask(task);
  111. if (err != 0 && task->cb) {
  112. task->cb(NULL);
  113. }
  114. }
  115. int doTask(const HttpClientTaskPtr& task);
  116. static int sendRequest(const SocketChannelPtr& channel);
  117. // channel
  118. const SocketChannelPtr& getChannel(int fd) {
  119. return channels[fd];
  120. // return fd < channels.capacity() ? channels[fd] : NULL;
  121. }
  122. const SocketChannelPtr& addChannel(hio_t* io) {
  123. SocketChannelPtr channel(new SocketChannel(io));
  124. channel->newContext<HttpClientContext>();
  125. int fd = channel->fd();
  126. channels[fd] = channel;
  127. return channels[fd];
  128. }
  129. void removeChannel(const SocketChannelPtr& channel) {
  130. channel->deleteContext<HttpClientContext>();
  131. int fd = channel->fd();
  132. channels.erase(fd);
  133. }
  134. private:
  135. // NOTE: just one loop thread, no need mutex.
  136. // fd => SocketChannelPtr
  137. std::map<int, SocketChannelPtr> channels;
  138. // peeraddr => ConnPool
  139. std::map<std::string, ConnPool<int>> conn_pools;
  140. EventLoopThread loop_thread;
  141. };
  142. }
  143. #endif // HV_ASYNC_HTTP_CLIENT_H_