AsyncHttpClient.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. #ifndef HV_ASYNC_HTTP_CLIENT_H_
  2. #define HV_ASYNC_HTTP_CLIENT_H_
  3. #include <list>
  4. #include "EventLoopThread.h"
  5. #include "Channel.h"
  6. #include "HttpMessage.h"
  7. #include "HttpParser.h"
  8. #define DEFAULT_FAIL_RETRY_COUNT 3
  9. #define DEFAULT_FAIL_RETRY_DELAY 1000 // ms
  10. // async => keepalive => connect_pool
  11. namespace hv {
  12. template<typename Conn>
  13. class ConnPool {
  14. public:
  15. int size() {
  16. return conns_.size();
  17. }
  18. bool get(Conn& conn) {
  19. if (conns_.empty()) return false;
  20. conn = conns_.front();
  21. conns_.pop_front();
  22. return true;
  23. }
  24. bool add(const Conn& conn) {
  25. conns_.push_back(conn);
  26. return true;
  27. }
  28. bool remove(const Conn& conn) {
  29. auto iter = conns_.begin();
  30. while (iter != conns_.end()) {
  31. if (*iter == conn) {
  32. iter = conns_.erase(iter);
  33. return true;
  34. } else {
  35. ++iter;
  36. }
  37. }
  38. return false;
  39. }
  40. private:
  41. std::list<Conn> conns_;
  42. };
  43. struct HttpClientTask {
  44. HttpRequestPtr req;
  45. HttpResponseCallback cb;
  46. uint64_t start_time;
  47. int retry_cnt;
  48. int retry_delay;
  49. };
  50. typedef std::shared_ptr<HttpClientTask> HttpClientTaskPtr;
  51. struct HttpClientContext {
  52. HttpClientTaskPtr task;
  53. HttpResponsePtr resp;
  54. HttpParserPtr parser;
  55. TimerID timerID;
  56. HttpClientContext() {
  57. timerID = INVALID_TIMER_ID;
  58. }
  59. ~HttpClientContext() {
  60. if (timerID != INVALID_TIMER_ID) {
  61. killTimer(timerID);
  62. timerID = INVALID_TIMER_ID;
  63. }
  64. }
  65. void callback() {
  66. if (timerID != INVALID_TIMER_ID) {
  67. killTimer(timerID);
  68. timerID = INVALID_TIMER_ID;
  69. }
  70. if (task && task->cb) {
  71. task->cb(resp);
  72. }
  73. // NOTE: task done
  74. task = NULL;
  75. }
  76. void successCallback() {
  77. callback();
  78. resp = NULL;
  79. }
  80. void errorCallback() {
  81. resp = NULL;
  82. callback();
  83. }
  84. };
  85. class AsyncHttpClient {
  86. public:
  87. AsyncHttpClient() {
  88. loop_thread.start(true);
  89. }
  90. ~AsyncHttpClient() {
  91. loop_thread.stop(true);
  92. }
  93. // thread-safe
  94. int send(const HttpRequestPtr& req, HttpResponseCallback resp_cb) {
  95. HttpClientTaskPtr task(new HttpClientTask);
  96. task->req = req;
  97. task->cb = std::move(resp_cb);
  98. task->start_time = hloop_now_hrtime(loop_thread.hloop());
  99. task->retry_cnt = DEFAULT_FAIL_RETRY_COUNT;
  100. task->retry_delay = DEFAULT_FAIL_RETRY_DELAY;
  101. return send(task);
  102. }
  103. int send(const HttpClientTaskPtr& task) {
  104. loop_thread.loop()->queueInLoop(std::bind(&AsyncHttpClient::sendInLoop, this, task));
  105. return 0;
  106. }
  107. protected:
  108. void sendInLoop(HttpClientTaskPtr task) {
  109. int err = doTask(task);
  110. if (err != 0 && task->cb) {
  111. task->cb(NULL);
  112. }
  113. }
  114. int doTask(const HttpClientTaskPtr& task);
  115. static int sendRequest(const SocketChannelPtr& channel);
  116. // channel
  117. const SocketChannelPtr& getChannel(int fd) {
  118. return channels[fd];
  119. // return fd < channels.capacity() ? channels[fd] : NULL;
  120. }
  121. const SocketChannelPtr& addChannel(hio_t* io) {
  122. SocketChannelPtr channel(new SocketChannel(io));
  123. channel->newContext<HttpClientContext>();
  124. int fd = channel->fd();
  125. channels[fd] = channel;
  126. return channels[fd];
  127. }
  128. void removeChannel(const SocketChannelPtr& channel) {
  129. channel->deleteContext<HttpClientContext>();
  130. int fd = channel->fd();
  131. channels.erase(fd);
  132. }
  133. private:
  134. EventLoopThread loop_thread;
  135. // NOTE: just one loop thread, no need mutex.
  136. // fd => SocketChannelPtr
  137. std::map<int, SocketChannelPtr> channels;
  138. // peeraddr => ConnPool
  139. std::map<std::string, ConnPool<int>> conn_pools;
  140. };
  141. }
  142. #endif // HV_ASYNC_HTTP_CLIENT_H_