UdpClient.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. #ifndef HV_UDP_CLIENT_HPP_
  2. #define HV_UDP_CLIENT_HPP_
  3. #include "hsocket.h"
  4. #include "EventLoopThread.h"
  5. #include "Channel.h"
  6. namespace hv {
  7. template<class TSocketChannel = SocketChannel>
  8. class UdpClientTmpl {
  9. public:
  10. typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
  11. UdpClientTmpl() {
  12. #if WITH_KCP
  13. enable_kcp = false;
  14. #endif
  15. }
  16. virtual ~UdpClientTmpl() {
  17. }
  18. const EventLoopPtr& loop() {
  19. return loop_thread.loop();
  20. }
  21. //NOTE: By default, not bind local port. If necessary, you can call system api bind() after createsocket().
  22. //@retval >=0 sockfd, <0 error
  23. int createsocket(int remote_port, const char* remote_host = "127.0.0.1") {
  24. hio_t* io = hloop_create_udp_client(loop_thread.hloop(), remote_host, remote_port);
  25. if (io == NULL) return -1;
  26. channel.reset(new TSocketChannel(io));
  27. return channel->fd();
  28. }
  29. // closesocket thread-safe
  30. void closesocket() {
  31. if (channel) {
  32. channel->close(true);
  33. }
  34. }
  35. int startRecv() {
  36. assert(channel != NULL);
  37. channel->onread = [this](Buffer* buf) {
  38. if (onMessage) {
  39. onMessage(channel, buf);
  40. }
  41. };
  42. channel->onwrite = [this](Buffer* buf) {
  43. if (onWriteComplete) {
  44. onWriteComplete(channel, buf);
  45. }
  46. };
  47. #if WITH_KCP
  48. if (enable_kcp) {
  49. hio_set_kcp(channel->io(), &kcp_setting);
  50. }
  51. #endif
  52. return channel->startRead();
  53. }
  54. void start(bool wait_threads_started = true) {
  55. loop_thread.start(wait_threads_started, std::bind(&UdpClientTmpl::startRecv, this));
  56. }
  57. // stop thread-safe
  58. void stop(bool wait_threads_stopped = true) {
  59. loop_thread.stop(wait_threads_stopped);
  60. }
  61. // sendto thread-safe
  62. int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
  63. if (channel == NULL) return -1;
  64. std::lock_guard<std::mutex> locker(sendto_mutex);
  65. if (peeraddr) hio_set_peeraddr(channel->io(), peeraddr, SOCKADDR_LEN(peeraddr));
  66. return channel->write(data, size);
  67. }
  68. int sendto(Buffer* buf, struct sockaddr* peeraddr = NULL) {
  69. return sendto(buf->data(), buf->size(), peeraddr);
  70. }
  71. int sendto(const std::string& str, struct sockaddr* peeraddr = NULL) {
  72. return sendto(str.data(), str.size(), peeraddr);
  73. }
  74. #if WITH_KCP
  75. void setKcp(kcp_setting_t* setting) {
  76. if (setting) {
  77. enable_kcp = true;
  78. kcp_setting = *setting;
  79. } else {
  80. enable_kcp = false;
  81. }
  82. }
  83. #endif
  84. public:
  85. TSocketChannelPtr channel;
  86. #if WITH_KCP
  87. bool enable_kcp;
  88. kcp_setting_t kcp_setting;
  89. #endif
  90. // Callback
  91. std::function<void(const TSocketChannelPtr&, Buffer*)> onMessage;
  92. // NOTE: Use Channel::isWriteComplete in onWriteComplete callback to determine whether all data has been written.
  93. std::function<void(const TSocketChannelPtr&, Buffer*)> onWriteComplete;
  94. private:
  95. std::mutex sendto_mutex;
  96. EventLoopThread loop_thread;
  97. };
  98. typedef UdpClientTmpl<SocketChannel> UdpClient;
  99. }
  100. #endif // HV_UDP_CLIENT_HPP_