ithewei hace 4 años
padre
commit
1c5dfb7863
Se han modificado 3 ficheros con 236 adiciones y 232 borrados
  1. 229 8
      event/hevent.c
  2. 3 221
      event/hloop.c
  3. 4 3
      event/nio.c

+ 229 - 8
event/hevent.c

@@ -13,6 +13,173 @@ uint32_t hio_next_id() {
     return ++s_id;
 }
 
+static void fill_io_type(hio_t* io) {
+    int type = 0;
+    socklen_t optlen = sizeof(int);
+    int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
+    printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
+    if (ret == 0) {
+        switch (type) {
+        case SOCK_STREAM:   io->io_type = HIO_TYPE_TCP; break;
+        case SOCK_DGRAM:    io->io_type = HIO_TYPE_UDP; break;
+        case SOCK_RAW:      io->io_type = HIO_TYPE_IP;  break;
+        default: io->io_type = HIO_TYPE_SOCKET;         break;
+        }
+    }
+    else if (socket_errno() == ENOTSOCK) {
+        switch (io->fd) {
+        case 0: io->io_type = HIO_TYPE_STDIN;   break;
+        case 1: io->io_type = HIO_TYPE_STDOUT;  break;
+        case 2: io->io_type = HIO_TYPE_STDERR;  break;
+        default: io->io_type = HIO_TYPE_FILE;   break;
+        }
+    }
+    else {
+        io->io_type = HIO_TYPE_TCP;
+    }
+}
+
+static void hio_socket_init(hio_t* io) {
+    // nonblocking
+    nonblocking(io->fd);
+    // fill io->localaddr io->peeraddr
+    if (io->localaddr == NULL) {
+        HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
+    }
+    if (io->peeraddr == NULL) {
+        HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
+    }
+    socklen_t addrlen = sizeof(sockaddr_u);
+    int ret = getsockname(io->fd, io->localaddr, &addrlen);
+    printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
+    // NOTE:
+    // tcp_server peeraddr set by accept
+    // udp_server peeraddr set by recvfrom
+    // tcp_client/udp_client peeraddr set by hio_setpeeraddr
+    if (io->io_type == HIO_TYPE_TCP || io->io_type == HIO_TYPE_SSL) {
+        // tcp acceptfd
+        addrlen = sizeof(sockaddr_u);
+        ret = getpeername(io->fd, io->peeraddr, &addrlen);
+        printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
+    }
+}
+
+void hio_init(hio_t* io) {
+    // alloc localaddr,peeraddr when hio_socket_init
+    /*
+    if (io->localaddr == NULL) {
+        HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
+    }
+    if (io->peeraddr == NULL) {
+        HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
+    }
+    */
+
+    // write_queue init when hwrite try_write failed
+    // write_queue_init(&io->write_queue, 4);
+
+    hrecursive_mutex_init(&io->write_mutex);
+}
+
+void hio_ready(hio_t* io) {
+    if (io->ready) return;
+    // flags
+    io->ready = 1;
+    io->closed = 0;
+    io->accept = io->connect = io->connectex = 0;
+    io->recv = io->send = 0;
+    io->recvfrom = io->sendto = 0;
+    io->close = 0;
+    // public:
+    io->id = hio_next_id();
+    io->io_type = HIO_TYPE_UNKNOWN;
+    io->error = 0;
+    io->events = io->revents = 0;
+    // readbuf
+    io->alloced_readbuf = 0;
+    io->readbuf.base = io->loop->readbuf.base;
+    io->readbuf.len = io->loop->readbuf.len;
+    io->readbuf.offset = 0;
+    io->read_once = 0;
+    io->read_until = 0;
+    io->small_readbytes_cnt = 0;
+    // write_queue
+    io->write_queue_bytes = 0;
+    // callbacks
+    io->read_cb = NULL;
+    io->write_cb = NULL;
+    io->close_cb = NULL;
+    io->accept_cb = NULL;
+    io->connect_cb = NULL;
+    // timers
+    io->connect_timeout = 0;
+    io->connect_timer = NULL;
+    io->close_timeout = 0;
+    io->close_timer = NULL;
+    io->keepalive_timeout = 0;
+    io->keepalive_timer = NULL;
+    io->heartbeat_interval = 0;
+    io->heartbeat_fn = NULL;
+    io->heartbeat_timer = NULL;
+    // upstream
+    io->upstream_io = NULL;
+    // unpack
+    io->unpack_setting = NULL;
+    // private:
+    io->event_index[0] = io->event_index[1] = -1;
+    io->hovlp = NULL;
+    io->ssl = NULL;
+
+    // io_type
+    fill_io_type(io);
+    if (io->io_type & HIO_TYPE_SOCKET) {
+        hio_socket_init(io);
+    }
+}
+
+void hio_done(hio_t* io) {
+    if (!io->ready) return;
+    io->ready = 0;
+
+    hio_del(io, HV_RDWR);
+
+    // readbuf
+    hio_free_readbuf(io);
+
+    // write_queue
+    offset_buf_t* pbuf = NULL;
+    hrecursive_mutex_lock(&io->write_mutex);
+    while (!write_queue_empty(&io->write_queue)) {
+        pbuf = write_queue_front(&io->write_queue);
+        HV_FREE(pbuf->base);
+        write_queue_pop_front(&io->write_queue);
+    }
+    write_queue_cleanup(&io->write_queue);
+    hrecursive_mutex_unlock(&io->write_mutex);
+}
+
+void hio_free(hio_t* io) {
+    if (io == NULL) return;
+    // NOTE: call hio_done to cleanup write_queue
+    hio_done(io);
+    // NOTE: call hio_close to call hclose_cb
+    hio_close(io);
+    hrecursive_mutex_destroy(&io->write_mutex);
+    HV_FREE(io->localaddr);
+    HV_FREE(io->peeraddr);
+    HV_FREE(io);
+}
+
+bool hio_is_opened(hio_t* io) {
+    if (io == NULL) return false;
+    return io->ready == 1 && io->closed == 0;
+}
+
+bool hio_is_closed(hio_t* io) {
+    if (io == NULL) return true;
+    return io->ready == 0 && io->closed == 1;
+}
+
 uint32_t hio_id (hio_t* io) {
     return io->id;
 }
@@ -340,14 +507,7 @@ int hio_read_until(hio_t* io, int len) {
     return hio_read_once(io);
 }
 
-void hio_unset_unpack(hio_t* io) {
-    if (io->unpack_setting) {
-        io->unpack_setting = NULL;
-        // NOTE: unpack has own readbuf
-        hio_free_readbuf(io);
-    }
-}
-
+//-----------------unpack---------------------------------------------
 void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
     hio_unset_unpack(io);
     if (setting == NULL) return;
@@ -379,3 +539,64 @@ void hio_set_unpack(hio_t* io, unpack_setting_t* setting) {
     }
     hio_alloc_readbuf(io, io->readbuf.len);
 }
+
+void hio_unset_unpack(hio_t* io) {
+    if (io->unpack_setting) {
+        io->unpack_setting = NULL;
+        // NOTE: unpack has own readbuf
+        hio_free_readbuf(io);
+    }
+}
+
+//-----------------upstream---------------------------------------------
+void hio_read_upstream(hio_t* io) {
+    hio_t* upstream_io = io->upstream_io;
+    if (upstream_io) {
+        hio_read(io);
+        hio_read(upstream_io);
+    }
+}
+
+void hio_write_upstream(hio_t* io, void* buf, int bytes) {
+    hio_t* upstream_io = io->upstream_io;
+    if (upstream_io) {
+        hio_write(upstream_io, buf, bytes);
+    }
+}
+
+void hio_close_upstream(hio_t* io) {
+    hio_t* upstream_io = io->upstream_io;
+    if (upstream_io) {
+        hio_close(upstream_io);
+    }
+}
+
+void hio_setup_upstream(hio_t* io1, hio_t* io2) {
+    io1->upstream_io = io2;
+    io2->upstream_io = io1;
+    hio_setcb_read(io1, hio_write_upstream);
+    hio_setcb_read(io2, hio_write_upstream);
+}
+
+hio_t* hio_get_upstream(hio_t* io) {
+    return io->upstream_io;
+}
+
+hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
+    hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_STREAM);
+    if (upstream_io == NULL) return NULL;
+    if (ssl) hio_enable_ssl(upstream_io);
+    hio_setup_upstream(io, upstream_io);
+    hio_setcb_close(io, hio_close_upstream);
+    hio_setcb_close(upstream_io, hio_close_upstream);
+    hconnect(io->loop, upstream_io->fd, hio_read_upstream);
+    return upstream_io;
+}
+
+hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
+    hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_DGRAM);
+    if (upstream_io == NULL) return NULL;
+    hio_setup_upstream(io, upstream_io);
+    hio_read_upstream(io);
+    return upstream_io;
+}

+ 3 - 221
event/hloop.c

@@ -354,7 +354,7 @@ void hloop_free(hloop_t** pp) {
     }
 }
 
-// while(loop->status) { hloop_process_events(loop); }
+// while (loop->status) { hloop_process_events(loop); }
 int hloop_run(hloop_t* loop) {
     if (loop == NULL) return -1;
     if (loop->status == HLOOP_STATUS_RUNNING) return -2;
@@ -597,173 +597,6 @@ const char* hio_engine() {
 #endif
 }
 
-static void fill_io_type(hio_t* io) {
-    int type = 0;
-    socklen_t optlen = sizeof(int);
-    int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
-    printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
-    if (ret == 0) {
-        switch (type) {
-        case SOCK_STREAM:   io->io_type = HIO_TYPE_TCP; break;
-        case SOCK_DGRAM:    io->io_type = HIO_TYPE_UDP; break;
-        case SOCK_RAW:      io->io_type = HIO_TYPE_IP;  break;
-        default: io->io_type = HIO_TYPE_SOCKET;         break;
-        }
-    }
-    else if (socket_errno() == ENOTSOCK) {
-        switch (io->fd) {
-        case 0: io->io_type = HIO_TYPE_STDIN;   break;
-        case 1: io->io_type = HIO_TYPE_STDOUT;  break;
-        case 2: io->io_type = HIO_TYPE_STDERR;  break;
-        default: io->io_type = HIO_TYPE_FILE;   break;
-        }
-    }
-    else {
-        io->io_type = HIO_TYPE_TCP;
-    }
-}
-
-static void hio_socket_init(hio_t* io) {
-    // nonblocking
-    nonblocking(io->fd);
-    // fill io->localaddr io->peeraddr
-    if (io->localaddr == NULL) {
-        HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
-    }
-    if (io->peeraddr == NULL) {
-        HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
-    }
-    socklen_t addrlen = sizeof(sockaddr_u);
-    int ret = getsockname(io->fd, io->localaddr, &addrlen);
-    printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
-    // NOTE:
-    // tcp_server peeraddr set by accept
-    // udp_server peeraddr set by recvfrom
-    // tcp_client/udp_client peeraddr set by hio_setpeeraddr
-    if (io->io_type == HIO_TYPE_TCP || io->io_type == HIO_TYPE_SSL) {
-        // tcp acceptfd
-        addrlen = sizeof(sockaddr_u);
-        ret = getpeername(io->fd, io->peeraddr, &addrlen);
-        printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
-    }
-}
-
-void hio_init(hio_t* io) {
-    // alloc localaddr,peeraddr when hio_socket_init
-    /*
-    if (io->localaddr == NULL) {
-        HV_ALLOC(io->localaddr, sizeof(sockaddr_u));
-    }
-    if (io->peeraddr == NULL) {
-        HV_ALLOC(io->peeraddr, sizeof(sockaddr_u));
-    }
-    */
-
-    // write_queue init when hwrite try_write failed
-    // write_queue_init(&io->write_queue, 4);
-
-    hrecursive_mutex_init(&io->write_mutex);
-}
-
-void hio_ready(hio_t* io) {
-    if (io->ready) return;
-    // flags
-    io->ready = 1;
-    io->closed = 0;
-    io->accept = io->connect = io->connectex = 0;
-    io->recv = io->send = 0;
-    io->recvfrom = io->sendto = 0;
-    io->close = 0;
-    // public:
-    io->id = hio_next_id();
-    io->io_type = HIO_TYPE_UNKNOWN;
-    io->error = 0;
-    io->events = io->revents = 0;
-    // readbuf
-    io->alloced_readbuf = 0;
-    io->readbuf.base = io->loop->readbuf.base;
-    io->readbuf.len = io->loop->readbuf.len;
-    io->readbuf.offset = 0;
-    io->read_once = 0;
-    io->read_until = 0;
-    io->small_readbytes_cnt = 0;
-    // write_queue
-    io->write_queue_bytes = 0;
-    // callbacks
-    io->read_cb = NULL;
-    io->write_cb = NULL;
-    io->close_cb = NULL;
-    io->accept_cb = NULL;
-    io->connect_cb = NULL;
-    // timers
-    io->connect_timeout = 0;
-    io->connect_timer = NULL;
-    io->close_timeout = 0;
-    io->close_timer = NULL;
-    io->keepalive_timeout = 0;
-    io->keepalive_timer = NULL;
-    io->heartbeat_interval = 0;
-    io->heartbeat_fn = NULL;
-    io->heartbeat_timer = NULL;
-    // upstream
-    io->upstream_io = NULL;
-    // unpack
-    io->unpack_setting = NULL;
-    // private:
-    io->event_index[0] = io->event_index[1] = -1;
-    io->hovlp = NULL;
-    io->ssl = NULL;
-
-    // io_type
-    fill_io_type(io);
-    if (io->io_type & HIO_TYPE_SOCKET) {
-        hio_socket_init(io);
-    }
-}
-
-void hio_done(hio_t* io) {
-    if (!io->ready) return;
-    io->ready = 0;
-
-    hio_del(io, HV_RDWR);
-
-    // readbuf
-    hio_free_readbuf(io);
-
-    // write_queue
-    offset_buf_t* pbuf = NULL;
-    hrecursive_mutex_lock(&io->write_mutex);
-    while (!write_queue_empty(&io->write_queue)) {
-        pbuf = write_queue_front(&io->write_queue);
-        HV_FREE(pbuf->base);
-        write_queue_pop_front(&io->write_queue);
-    }
-    write_queue_cleanup(&io->write_queue);
-    hrecursive_mutex_unlock(&io->write_mutex);
-}
-
-void hio_free(hio_t* io) {
-    if (io == NULL) return;
-    // NOTE: call hio_done to cleanup write_queue
-    hio_done(io);
-    // NOTE: call hio_close to call hclose_cb
-    hio_close(io);
-    hrecursive_mutex_destroy(&io->write_mutex);
-    HV_FREE(io->localaddr);
-    HV_FREE(io->peeraddr);
-    HV_FREE(io);
-}
-
-bool hio_is_opened(hio_t* io) {
-    if (io == NULL) return false;
-    return io->ready == 1 && io->closed == 0;
-}
-
-bool hio_is_closed(hio_t* io) {
-    if (io == NULL) return true;
-    return io->ready == 0 && io->closed == 1;
-}
-
 hio_t* hio_get(hloop_t* loop, int fd) {
     if (fd >= loop->ios.maxsize) {
         int newsize = ceil2e(fd);
@@ -875,6 +708,7 @@ int hio_close_async(hio_t* io) {
     return 0;
 }
 
+//------------------high-level apis-------------------------------------------
 hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
     hio_t* io = hio_get(loop, fd);
     assert(io != NULL);
@@ -961,6 +795,7 @@ hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_c
     return hwrite(loop, sockfd, buf, len, write_cb);
 }
 
+//-----------------top-level apis---------------------------------------------
 hio_t* hio_create(hloop_t* loop, const char* host, int port, int type) {
     sockaddr_u peeraddr;
     memset(&peeraddr, 0, sizeof(peeraddr));
@@ -1026,56 +861,3 @@ hio_t* hloop_create_udp_server(hloop_t* loop, const char* host, int port) {
 hio_t* hloop_create_udp_client(hloop_t* loop, const char* host, int port) {
     return hio_create(loop, host, port, SOCK_DGRAM);
 }
-
-// upstream
-void hio_read_upstream(hio_t* io) {
-    hio_t* upstream_io = io->upstream_io;
-    if (upstream_io) {
-        hio_read(io);
-        hio_read(upstream_io);
-    }
-}
-
-void hio_write_upstream(hio_t* io, void* buf, int bytes) {
-    hio_t* upstream_io = io->upstream_io;
-    if (upstream_io) {
-        hio_write(upstream_io, buf, bytes);
-    }
-}
-
-void hio_close_upstream(hio_t* io) {
-    hio_t* upstream_io = io->upstream_io;
-    if (upstream_io) {
-        hio_close(upstream_io);
-    }
-}
-
-void hio_setup_upstream(hio_t* io1, hio_t* io2) {
-    io1->upstream_io = io2;
-    io2->upstream_io = io1;
-    hio_setcb_read(io1, hio_write_upstream);
-    hio_setcb_read(io2, hio_write_upstream);
-}
-
-hio_t* hio_get_upstream(hio_t* io) {
-    return io->upstream_io;
-}
-
-hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
-    hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_STREAM);
-    if (upstream_io == NULL) return NULL;
-    if (ssl) hio_enable_ssl(upstream_io);
-    hio_setup_upstream(io, upstream_io);
-    hio_setcb_close(io, hio_close_upstream);
-    hio_setcb_close(upstream_io, hio_close_upstream);
-    hconnect(io->loop, upstream_io->fd, hio_read_upstream);
-    return upstream_io;
-}
-
-hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
-    hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_DGRAM);
-    if (upstream_io == NULL) return NULL;
-    hio_setup_upstream(io, upstream_io);
-    hio_read_upstream(io);
-    return upstream_io;
-}

+ 4 - 3
event/nio.c

@@ -514,10 +514,11 @@ enqueue:
         }
         write_queue_push_back(&io->write_queue, &remain);
         io->write_queue_bytes += remain.len - remain.offset;
-        // hlogd("write queue %d, total %u", remain.len - remain.offset, io->write_queue_bytes);
         if (io->write_queue_bytes > WRITE_QUEUE_HIGH_WATER) {
-            hlogw("write queue %d, total %u, over high water %u",
-                    remain.len - remain.offset, io->write_queue_bytes, WRITE_QUEUE_HIGH_WATER);
+            hlogw("write queue %u, total %u, over high water %u",
+                (unsigned int)(remain.len - remain.offset),
+                (unsigned int)io->write_queue_bytes,
+                (unsigned int)WRITE_QUEUE_HIGH_WATER);
         }
     }
     hrecursive_mutex_unlock(&io->write_mutex);