Kaynağa Gözat

optimize code for rudp, kcp

ithewei 4 yıl önce
ebeveyn
işleme
eefc043fd4
6 değiştirilmiş dosya ile 181 ekleme ve 165 silme
  1. 0 127
      event/hevent.c
  2. 0 9
      event/hevent.h
  3. 116 0
      event/kcp/hkcp.c
  4. 30 0
      event/kcp/hkcp.h
  5. 30 14
      event/rudp.c
  6. 5 15
      event/rudp.h

+ 0 - 127
event/hevent.c

@@ -739,130 +739,3 @@ hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
     return upstream_io;
     return upstream_io;
 }
 }
 
 
-#if WITH_RUDP
-rudp_entry_t* hio_get_rudp(hio_t* io) {
-    rudp_entry_t* rudp = rudp_get(&io->rudp, io->peeraddr);
-    rudp->io = io;
-    return rudp;
-}
-
-static void hio_close_rudp_event_cb(hevent_t* ev) {
-    rudp_entry_t* entry = (rudp_entry_t*)ev->userdata;
-    rudp_del(&entry->io->rudp, (struct sockaddr*)&entry->addr);
-    // rudp_entry_free(entry);
-}
-
-int hio_close_rudp(hio_t* io, struct sockaddr* peeraddr) {
-    if (peeraddr == NULL) peeraddr = io->peeraddr;
-    // NOTE: do rudp_del for thread-safe
-    rudp_entry_t* entry = rudp_get(&io->rudp, peeraddr);
-    // NOTE: just rudp_remove first, do rudp_entry_free async for safe.
-    // rudp_entry_t* entry = rudp_remove(&io->rudp, peeraddr);
-    if (entry) {
-        hevent_t ev;
-        memset(&ev, 0, sizeof(ev));
-        ev.cb = hio_close_rudp_event_cb;
-        ev.userdata = entry;
-        ev.priority = HEVENT_HIGH_PRIORITY;
-        hloop_post_event(io->loop, &ev);
-    }
-    return 0;
-}
-#endif
-
-#if WITH_KCP
-static kcp_setting_t s_kcp_setting;
-static int __kcp_output(const char* buf, int len, ikcpcb* ikcp, void* userdata) {
-    // printf("ikcp_output len=%d\n", len);
-    rudp_entry_t* rudp = (rudp_entry_t*)userdata;
-    assert(rudp != NULL && rudp->io != NULL);
-    int nsend = sendto(rudp->io->fd, buf, len, 0, &rudp->addr.sa, SOCKADDR_LEN(&rudp->addr));
-    // printf("sendto nsend=%d\n", nsend);
-    return nsend;
-}
-
-static void __kcp_update_timer_cb(htimer_t* timer) {
-    rudp_entry_t* rudp = (rudp_entry_t*)timer->privdata;
-    assert(rudp != NULL && rudp->io != NULL && rudp->kcp.ikcp != NULL);
-    ikcp_update(rudp->kcp.ikcp, (IUINT32)(rudp->io->loop->cur_hrtime / 1000));
-}
-
-int hio_set_kcp(hio_t* io, kcp_setting_t* setting) {
-    io->io_type = HIO_TYPE_KCP;
-    io->kcp_setting = setting;
-    return 0;
-}
-
-kcp_t* hio_get_kcp(hio_t* io, uint32_t conv) {
-    rudp_entry_t* rudp = hio_get_rudp(io);
-    assert(rudp != NULL);
-    kcp_t* kcp = &rudp->kcp;
-    if (kcp->ikcp != NULL) return kcp;
-    if (io->kcp_setting == NULL) {
-        io->kcp_setting = &s_kcp_setting;
-    }
-    kcp_setting_t* setting = io->kcp_setting;
-    assert(io->kcp_setting != NULL);
-    kcp->ikcp = ikcp_create(conv, rudp);
-    // printf("ikcp_create conv=%u ikcp=%p\n", conv, kcp->ikcp);
-    kcp->ikcp->output = __kcp_output;
-    kcp->conv = conv;
-    if (setting->interval > 0) {
-        ikcp_nodelay(kcp->ikcp, setting->nodelay, setting->interval, setting->fastresend, setting->nocwnd);
-    }
-    if (setting->sndwnd > 0 && setting->rcvwnd > 0) {
-        ikcp_wndsize(kcp->ikcp, setting->sndwnd, setting->rcvwnd);
-    }
-    if (setting->mtu > 0) {
-        ikcp_setmtu(kcp->ikcp, setting->mtu);
-    }
-    if (kcp->update_timer == NULL) {
-        int update_interval = setting->update_interval;
-        if (update_interval == 0) {
-            update_interval = DEFAULT_KCP_UPDATE_INTERVAL;
-        }
-        kcp->update_timer = htimer_add(io->loop, __kcp_update_timer_cb, update_interval, INFINITE);
-        kcp->update_timer->privdata = rudp;
-    }
-    // NOTE: alloc kcp->readbuf when hio_read_kcp
-    return kcp;
-}
-
-int hio_write_kcp(hio_t* io, const void* buf, size_t len) {
-    IUINT32 conv = io->kcp_setting ? io->kcp_setting->conv : 0;
-    kcp_t* kcp = hio_get_kcp(io, conv);
-    int nsend = ikcp_send(kcp->ikcp, (const char*)buf, len);
-    // printf("ikcp_send len=%d nsend=%d\n", (int)len, nsend);
-    if (nsend < 0) {
-        hio_close(io);
-    } else {
-        ikcp_update(kcp->ikcp, (IUINT32)io->loop->cur_hrtime / 1000);
-    }
-    return nsend;
-}
-
-int hio_read_kcp (hio_t* io, void* buf, int readbytes) {
-    IUINT32 conv = ikcp_getconv(buf);
-    kcp_t* kcp = hio_get_kcp(io, conv);
-    if (kcp->conv != conv) {
-        hloge("recv invalid kcp packet!");
-        hio_close_rudp(io, io->peeraddr);
-        return -1;
-    }
-    // printf("ikcp_input len=%d\n", readbytes);
-    ikcp_input(kcp->ikcp, (const char*)buf, readbytes);
-    if (kcp->readbuf.base == NULL || kcp->readbuf.len == 0) {
-        kcp->readbuf.len = DEFAULT_KCP_READ_BUFSIZE;
-        HV_ALLOC(kcp->readbuf.base, kcp->readbuf.len);
-    }
-    int ret = 0;
-    while (1) {
-        int nrecv = ikcp_recv(kcp->ikcp, kcp->readbuf.base, kcp->readbuf.len);
-        // printf("ikcp_recv nrecv=%d\n", nrecv);
-        if (nrecv < 0) break;
-        hio_read_cb(io, kcp->readbuf.base, nrecv);
-        ret += nrecv;
-    }
-    return ret;
-}
-#endif

+ 0 - 9
event/hevent.h

@@ -210,15 +210,6 @@ static inline bool hio_is_alloced_readbuf(hio_t* io) {
 void hio_alloc_readbuf(hio_t* io, int len);
 void hio_alloc_readbuf(hio_t* io, int len);
 void hio_free_readbuf(hio_t* io);
 void hio_free_readbuf(hio_t* io);
 
 
-#if WITH_RUDP
-rudp_entry_t* hio_get_rudp(hio_t* io);
-#if WITH_KCP
-kcp_t*  hio_get_kcp(hio_t* io, uint32_t conv);
-int     hio_write_kcp(hio_t* io, const void* buf, size_t len);
-int     hio_read_kcp (hio_t* io, void* buf, int readbytes);
-#endif
-#endif
-
 #define EVENT_ENTRY(p)          container_of(p, hevent_t, pending_node)
 #define EVENT_ENTRY(p)          container_of(p, hevent_t, pending_node)
 #define IDLE_ENTRY(p)           container_of(p, hidle_t,  node)
 #define IDLE_ENTRY(p)           container_of(p, hidle_t,  node)
 #define TIMER_ENTRY(p)          container_of(p, htimer_t, node)
 #define TIMER_ENTRY(p)          container_of(p, htimer_t, node)

+ 116 - 0
event/kcp/hkcp.c

@@ -0,0 +1,116 @@
+#include "hkcp.h"
+
+#if WITH_KCP
+
+#include "hevent.h"
+#include "hlog.h"
+
+static kcp_setting_t s_kcp_setting;
+
+static int __kcp_output(const char* buf, int len, ikcpcb* ikcp, void* userdata) {
+    // printf("ikcp_output len=%d\n", len);
+    rudp_entry_t* rudp = (rudp_entry_t*)userdata;
+    assert(rudp != NULL && rudp->io != NULL);
+    int nsend = sendto(rudp->io->fd, buf, len, 0, &rudp->addr.sa, SOCKADDR_LEN(&rudp->addr));
+    // printf("sendto nsend=%d\n", nsend);
+    return nsend;
+}
+
+static void __kcp_update_timer_cb(htimer_t* timer) {
+    rudp_entry_t* rudp = (rudp_entry_t*)timer->privdata;
+    assert(rudp != NULL && rudp->io != NULL && rudp->kcp.ikcp != NULL);
+    ikcp_update(rudp->kcp.ikcp, (IUINT32)(rudp->io->loop->cur_hrtime / 1000));
+}
+
+void kcp_release(kcp_t* kcp) {
+    if (kcp->ikcp == NULL) return;
+    if (kcp->update_timer) {
+        htimer_del(kcp->update_timer);
+        kcp->update_timer = NULL;
+    }
+    HV_FREE(kcp->readbuf.base);
+    kcp->readbuf.len = 0;
+    // printf("ikcp_release ikcp=%p\n", kcp->ikcp);
+    ikcp_release(kcp->ikcp);
+    kcp->ikcp = NULL;
+}
+
+int hio_set_kcp(hio_t* io, kcp_setting_t* setting) {
+    io->io_type = HIO_TYPE_KCP;
+    io->kcp_setting = setting;
+    return 0;
+}
+
+kcp_t* hio_get_kcp(hio_t* io, uint32_t conv) {
+    rudp_entry_t* rudp = hio_get_rudp(io);
+    assert(rudp != NULL);
+    kcp_t* kcp = &rudp->kcp;
+    if (kcp->ikcp != NULL) return kcp;
+    if (io->kcp_setting == NULL) {
+        io->kcp_setting = &s_kcp_setting;
+    }
+    kcp_setting_t* setting = io->kcp_setting;
+    kcp->ikcp = ikcp_create(conv, rudp);
+    // printf("ikcp_create conv=%u ikcp=%p\n", conv, kcp->ikcp);
+    kcp->ikcp->output = __kcp_output;
+    kcp->conv = conv;
+    if (setting->interval > 0) {
+        ikcp_nodelay(kcp->ikcp, setting->nodelay, setting->interval, setting->fastresend, setting->nocwnd);
+    }
+    if (setting->sndwnd > 0 && setting->rcvwnd > 0) {
+        ikcp_wndsize(kcp->ikcp, setting->sndwnd, setting->rcvwnd);
+    }
+    if (setting->mtu > 0) {
+        ikcp_setmtu(kcp->ikcp, setting->mtu);
+    }
+    if (kcp->update_timer == NULL) {
+        int update_interval = setting->update_interval;
+        if (update_interval == 0) {
+            update_interval = DEFAULT_KCP_UPDATE_INTERVAL;
+        }
+        kcp->update_timer = htimer_add(io->loop, __kcp_update_timer_cb, update_interval, INFINITE);
+        kcp->update_timer->privdata = rudp;
+    }
+    // NOTE: alloc kcp->readbuf when hio_read_kcp
+    return kcp;
+}
+
+int hio_write_kcp(hio_t* io, const void* buf, size_t len) {
+    IUINT32 conv = io->kcp_setting ? io->kcp_setting->conv : 0;
+    kcp_t* kcp = hio_get_kcp(io, conv);
+    int nsend = ikcp_send(kcp->ikcp, (const char*)buf, len);
+    // printf("ikcp_send len=%d nsend=%d\n", (int)len, nsend);
+    if (nsend < 0) {
+        hio_close(io);
+    } else {
+        ikcp_update(kcp->ikcp, (IUINT32)io->loop->cur_hrtime / 1000);
+    }
+    return nsend;
+}
+
+int hio_read_kcp (hio_t* io, void* buf, int readbytes) {
+    IUINT32 conv = ikcp_getconv(buf);
+    kcp_t* kcp = hio_get_kcp(io, conv);
+    if (kcp->conv != conv) {
+        hloge("recv invalid kcp packet!");
+        hio_close_rudp(io, io->peeraddr);
+        return -1;
+    }
+    // printf("ikcp_input len=%d\n", readbytes);
+    ikcp_input(kcp->ikcp, (const char*)buf, readbytes);
+    if (kcp->readbuf.base == NULL || kcp->readbuf.len == 0) {
+        kcp->readbuf.len = DEFAULT_KCP_READ_BUFSIZE;
+        HV_ALLOC(kcp->readbuf.base, kcp->readbuf.len);
+    }
+    int ret = 0;
+    while (1) {
+        int nrecv = ikcp_recv(kcp->ikcp, kcp->readbuf.base, kcp->readbuf.len);
+        // printf("ikcp_recv nrecv=%d\n", nrecv);
+        if (nrecv < 0) break;
+        hio_read_cb(io, kcp->readbuf.base, nrecv);
+        ret += nrecv;
+    }
+    return ret;
+}
+
+#endif

+ 30 - 0
event/kcp/hkcp.h

@@ -0,0 +1,30 @@
+#ifndef HV_KCP_H_
+#define HV_KCP_H_
+
+#include "hloop.h"
+
+#if WITH_KCP
+
+#include "ikcp.h"
+#include "hbuf.h"
+
+#define DEFAULT_KCP_UPDATE_INTERVAL 10 // ms
+#define DEFAULT_KCP_READ_BUFSIZE    1400
+
+typedef struct kcp_s {
+    ikcpcb*         ikcp;
+    uint32_t        conv;
+    htimer_t*       update_timer;
+    hbuf_t          readbuf;
+} kcp_t;
+
+// NOTE: kcp_create in hio_get_kcp
+void kcp_release(kcp_t* kcp);
+
+kcp_t* hio_get_kcp  (hio_t* io, uint32_t conv);
+int    hio_read_kcp (hio_t* io, void* buf, int readbytes);
+int    hio_write_kcp(hio_t* io, const void* buf, size_t len);
+
+#endif
+
+#endif // HV_KCP_H_

+ 30 - 14
event/rudp.c

@@ -2,20 +2,7 @@
 
 
 #if WITH_RUDP
 #if WITH_RUDP
 
 
-#if WITH_KCP
-void kcp_release(kcp_t* kcp) {
-    if (kcp->ikcp == NULL) return;
-    if (kcp->update_timer) {
-        htimer_del(kcp->update_timer);
-        kcp->update_timer = NULL;
-    }
-    HV_FREE(kcp->readbuf.base);
-    kcp->readbuf.len = 0;
-    // printf("ikcp_release ikcp=%p\n", kcp->ikcp);
-    ikcp_release(kcp->ikcp);
-    kcp->ikcp = NULL;
-}
-#endif
+#include "hevent.h"
 
 
 void rudp_entry_free(rudp_entry_t* entry) {
 void rudp_entry_free(rudp_entry_t* entry) {
 #if WITH_KCP
 #if WITH_KCP
@@ -148,4 +135,33 @@ void rudp_del(rudp_t* rudp, struct sockaddr* addr) {
     hmutex_unlock(&rudp->mutex);
     hmutex_unlock(&rudp->mutex);
 }
 }
 
 
+rudp_entry_t* hio_get_rudp(hio_t* io) {
+    rudp_entry_t* rudp = rudp_get(&io->rudp, io->peeraddr);
+    rudp->io = io;
+    return rudp;
+}
+
+static void hio_close_rudp_event_cb(hevent_t* ev) {
+    rudp_entry_t* entry = (rudp_entry_t*)ev->userdata;
+    rudp_del(&entry->io->rudp, (struct sockaddr*)&entry->addr);
+    // rudp_entry_free(entry);
+}
+
+int hio_close_rudp(hio_t* io, struct sockaddr* peeraddr) {
+    if (peeraddr == NULL) peeraddr = io->peeraddr;
+    // NOTE: do rudp_del for thread-safe
+    rudp_entry_t* entry = rudp_get(&io->rudp, peeraddr);
+    // NOTE: just rudp_remove first, do rudp_entry_free async for safe.
+    // rudp_entry_t* entry = rudp_remove(&io->rudp, peeraddr);
+    if (entry) {
+        hevent_t ev;
+        memset(&ev, 0, sizeof(ev));
+        ev.cb = hio_close_rudp_event_cb;
+        ev.userdata = entry;
+        ev.priority = HEVENT_HIGH_PRIORITY;
+        hloop_post_event(io->loop, &ev);
+    }
+    return 0;
+}
+
 #endif
 #endif

+ 5 - 15
event/rudp.h

@@ -8,22 +8,8 @@
 #include "rbtree.h"
 #include "rbtree.h"
 #include "hsocket.h"
 #include "hsocket.h"
 #include "hmutex.h"
 #include "hmutex.h"
-
 #if WITH_KCP
 #if WITH_KCP
-#include "kcp/ikcp.h"
-#include "hbuf.h"
-#define DEFAULT_KCP_UPDATE_INTERVAL 10 // ms
-#define DEFAULT_KCP_READ_BUFSIZE    1400
-
-typedef struct kcp_s {
-    ikcpcb*         ikcp;
-    uint32_t        conv;
-    htimer_t*       update_timer;
-    hbuf_t          readbuf;
-} kcp_t;
-
-// NOTE: kcp_create in hio_get_kcp
-void kcp_release(kcp_t* kcp);
+#include "kcp/hkcp.h"
 #endif
 #endif
 
 
 typedef struct rudp_s {
 typedef struct rudp_s {
@@ -40,6 +26,7 @@ typedef struct rudp_entry_s {
     kcp_t           kcp;
     kcp_t           kcp;
 #endif
 #endif
 } rudp_entry_t;
 } rudp_entry_t;
+
 // NOTE: rudp_entry_t alloc when rudp_get
 // NOTE: rudp_entry_t alloc when rudp_get
 void rudp_entry_free(rudp_entry_t* entry);
 void rudp_entry_free(rudp_entry_t* entry);
 
 
@@ -57,6 +44,9 @@ rudp_entry_t* rudp_get(rudp_t* rudp, struct sockaddr* addr);
 // rudp_remove + free
 // rudp_remove + free
 void          rudp_del(rudp_t* rudp, struct sockaddr* addr);
 void          rudp_del(rudp_t* rudp, struct sockaddr* addr);
 
 
+// rudp_get(&io->rudp, io->peeraddr)
+rudp_entry_t* hio_get_rudp(hio_t* io);
+
 #endif // WITH_RUDP
 #endif // WITH_RUDP
 
 
 #endif // HV_RUDP_H_
 #endif // HV_RUDP_H_