hkcp.c 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #include "hkcp.h"
  2. #if WITH_KCP
  3. #include "hevent.h"
  4. #include "hlog.h"
  5. #include "hthread.h"
  6. static kcp_setting_t s_kcp_setting;
  7. static int __kcp_output(const char* buf, int len, ikcpcb* ikcp, void* userdata) {
  8. // printf("ikcp_output len=%d\n", len);
  9. rudp_entry_t* rudp = (rudp_entry_t*)userdata;
  10. assert(rudp != NULL && rudp->io != NULL);
  11. int nsend = sendto(rudp->io->fd, buf, len, 0, &rudp->addr.sa, SOCKADDR_LEN(&rudp->addr));
  12. // printf("sendto nsend=%d\n", nsend);
  13. return nsend;
  14. }
  15. static void __kcp_update_timer_cb(htimer_t* timer) {
  16. rudp_entry_t* rudp = (rudp_entry_t*)timer->privdata;
  17. assert(rudp != NULL && rudp->io != NULL && rudp->kcp.ikcp != NULL);
  18. ikcp_update(rudp->kcp.ikcp, (IUINT32)(rudp->io->loop->cur_hrtime / 1000));
  19. }
  20. void kcp_release(kcp_t* kcp) {
  21. if (kcp->ikcp == NULL) return;
  22. if (kcp->update_timer) {
  23. htimer_del(kcp->update_timer);
  24. kcp->update_timer = NULL;
  25. }
  26. HV_FREE(kcp->readbuf.base);
  27. kcp->readbuf.len = 0;
  28. // printf("ikcp_release ikcp=%p\n", kcp->ikcp);
  29. ikcp_release(kcp->ikcp);
  30. kcp->ikcp = NULL;
  31. }
  32. int hio_set_kcp(hio_t* io, kcp_setting_t* setting) {
  33. io->io_type = HIO_TYPE_KCP;
  34. io->kcp_setting = setting;
  35. return 0;
  36. }
  37. kcp_t* hio_get_kcp(hio_t* io, uint32_t conv, struct sockaddr* addr) {
  38. rudp_entry_t* rudp = hio_get_rudp(io, addr);
  39. assert(rudp != NULL);
  40. kcp_t* kcp = &rudp->kcp;
  41. if (kcp->ikcp != NULL) return kcp;
  42. if (io->kcp_setting == NULL) {
  43. io->kcp_setting = &s_kcp_setting;
  44. }
  45. kcp_setting_t* setting = io->kcp_setting;
  46. kcp->ikcp = ikcp_create(conv, rudp);
  47. // printf("ikcp_create conv=%u ikcp=%p\n", conv, kcp->ikcp);
  48. kcp->ikcp->output = __kcp_output;
  49. kcp->conv = conv;
  50. if (setting->interval > 0) {
  51. ikcp_nodelay(kcp->ikcp, setting->nodelay, setting->interval, setting->fastresend, setting->nocwnd);
  52. }
  53. if (setting->sndwnd > 0 && setting->rcvwnd > 0) {
  54. ikcp_wndsize(kcp->ikcp, setting->sndwnd, setting->rcvwnd);
  55. }
  56. if (setting->mtu > 0) {
  57. ikcp_setmtu(kcp->ikcp, setting->mtu);
  58. }
  59. if (kcp->update_timer == NULL) {
  60. int update_interval = setting->update_interval;
  61. if (update_interval == 0) {
  62. update_interval = DEFAULT_KCP_UPDATE_INTERVAL;
  63. }
  64. kcp->update_timer = htimer_add(io->loop, __kcp_update_timer_cb, update_interval, INFINITE);
  65. kcp->update_timer->privdata = rudp;
  66. }
  67. // NOTE: alloc kcp->readbuf when hio_read_kcp
  68. return kcp;
  69. }
  70. static void hio_write_kcp_event_cb(hevent_t* ev) {
  71. hio_t* io = (hio_t*)ev->userdata;
  72. hbuf_t* buf = (hbuf_t*)ev->privdata;
  73. hio_write_kcp(io, buf->base + sizeof(sockaddr_u), buf->len - sizeof(sockaddr_u), (struct sockaddr*)buf->base);
  74. HV_FREE(buf);
  75. }
  76. static int hio_write_kcp_async(hio_t* io, const void* data, size_t len, struct sockaddr* addr) {
  77. hbuf_t* buf = NULL;
  78. HV_ALLOC(buf, sizeof(hbuf_t) + sizeof(sockaddr_u) + len);
  79. buf->base = (char*)buf + sizeof(hbuf_t);
  80. buf->len = len + sizeof(sockaddr_u);
  81. memcpy(buf->base, addr, sizeof(sockaddr_u));
  82. memcpy(buf->base + sizeof(sockaddr_u), data, len);
  83. hevent_t ev;
  84. memset(&ev, 0, sizeof(ev));
  85. ev.cb = hio_write_kcp_event_cb;
  86. ev.userdata = io;
  87. ev.privdata = buf;
  88. hloop_post_event(io->loop, &ev);
  89. return len;
  90. }
  91. int hio_write_kcp(hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
  92. if (hv_gettid() != io->loop->tid) {
  93. return hio_write_kcp_async(io, buf, len, addr);
  94. }
  95. IUINT32 conv = io->kcp_setting ? io->kcp_setting->conv : 0;
  96. kcp_t* kcp = hio_get_kcp(io, conv, addr);
  97. // printf("hio_write_kcp conv=%u=%u\n", conv, kcp->conv);
  98. int nsend = ikcp_send(kcp->ikcp, (const char*)buf, len);
  99. // printf("ikcp_send len=%d nsend=%d\n", (int)len, nsend);
  100. if (nsend < 0) {
  101. hloge("ikcp_send error: %d", nsend);
  102. return nsend;
  103. }
  104. ikcp_update(kcp->ikcp, (IUINT32)io->loop->cur_hrtime / 1000);
  105. return len;
  106. }
  107. int hio_read_kcp (hio_t* io, void* buf, int readbytes) {
  108. IUINT32 conv = ikcp_getconv(buf);
  109. kcp_t* kcp = hio_get_kcp(io, conv, NULL);
  110. // printf("hio_read_kcp conv=%u=%u\n", conv, kcp->conv);
  111. if (kcp->conv != conv) {
  112. hloge("recv invalid kcp packet!");
  113. hio_close_rudp(io, io->peeraddr);
  114. return -1;
  115. }
  116. // printf("ikcp_input len=%d\n", readbytes);
  117. int ret = ikcp_input(kcp->ikcp, (const char*)buf, readbytes);
  118. // printf("ikcp_input ret=%d\n", ret);
  119. if (ret != 0) {
  120. return 0;
  121. }
  122. if (kcp->readbuf.base == NULL || kcp->readbuf.len == 0) {
  123. kcp->readbuf.len = DEFAULT_KCP_READ_BUFSIZE;
  124. HV_ALLOC(kcp->readbuf.base, kcp->readbuf.len);
  125. }
  126. while (1) {
  127. int nrecv = ikcp_recv(kcp->ikcp, kcp->readbuf.base, kcp->readbuf.len);
  128. // printf("ikcp_recv nrecv=%d\n", nrecv);
  129. if (nrecv < 0) break;
  130. hio_read_cb(io, kcp->readbuf.base, nrecv);
  131. ret += nrecv;
  132. }
  133. return ret;
  134. }
  135. #endif