|
|
@@ -27,25 +27,18 @@ static int hloop_process_idles(hloop_t* loop) {
|
|
|
hidle_t* idle = NULL;
|
|
|
while (node != &loop->idles) {
|
|
|
idle = IDLE_ENTRY(node);
|
|
|
- if (idle->destroy) goto destroy;
|
|
|
- if (!idle->active) goto next;
|
|
|
- if (idle->repeat == 0) {
|
|
|
- hidle_del(idle);
|
|
|
- //goto next;
|
|
|
- goto destroy;
|
|
|
- }
|
|
|
if (idle->repeat != INFINITE) {
|
|
|
--idle->repeat;
|
|
|
}
|
|
|
+ if (idle->repeat == 0) {
|
|
|
+ hidle_del(idle);
|
|
|
+ }
|
|
|
EVENT_PENDING(idle);
|
|
|
++nidles;
|
|
|
-next:
|
|
|
- node = node->next;
|
|
|
- continue;
|
|
|
-destroy:
|
|
|
node = node->next;
|
|
|
- list_del(node->prev);
|
|
|
- SAFE_FREE(idle);
|
|
|
+ if (!idle->active) {
|
|
|
+ list_del(node->prev);
|
|
|
+ }
|
|
|
}
|
|
|
return nidles;
|
|
|
}
|
|
|
@@ -56,33 +49,29 @@ static int hloop_process_timers(hloop_t* loop) {
|
|
|
uint64_t now_hrtime = hloop_now_hrtime(loop);
|
|
|
while (loop->timers.root) {
|
|
|
timer = TIMER_ENTRY(loop->timers.root);
|
|
|
- if (timer->destroy) goto destroy;
|
|
|
- if (timer->repeat == 0) {
|
|
|
- htimer_del(timer);
|
|
|
- goto destroy;
|
|
|
- }
|
|
|
if (timer->next_timeout > now_hrtime) {
|
|
|
break;
|
|
|
}
|
|
|
if (timer->repeat != INFINITE) {
|
|
|
--timer->repeat;
|
|
|
}
|
|
|
- heap_dequeue(&loop->timers);
|
|
|
- if (timer->event_type == HEVENT_TYPE_TIMEOUT) {
|
|
|
- timer->next_timeout += ((htimeout_t*)timer)->timeout*1000;
|
|
|
- }
|
|
|
- else if (timer->event_type == HEVENT_TYPE_PERIOD) {
|
|
|
- hperiod_t* period = (hperiod_t*)timer;
|
|
|
- timer->next_timeout = calc_next_timeout(period->minute, period->hour, period->day,
|
|
|
- period->week, period->month) * 1e6;
|
|
|
+ if (timer->repeat == 0) {
|
|
|
+ htimer_del(timer);
|
|
|
}
|
|
|
- heap_insert(&loop->timers, &timer->node);
|
|
|
EVENT_PENDING(timer);
|
|
|
++ntimers;
|
|
|
- continue;
|
|
|
-destroy:
|
|
|
heap_dequeue(&loop->timers);
|
|
|
- SAFE_FREE(timer);
|
|
|
+ if (timer->active) {
|
|
|
+ if (timer->event_type == HEVENT_TYPE_TIMEOUT) {
|
|
|
+ timer->next_timeout += ((htimeout_t*)timer)->timeout*1000;
|
|
|
+ }
|
|
|
+ else if (timer->event_type == HEVENT_TYPE_PERIOD) {
|
|
|
+ hperiod_t* period = (hperiod_t*)timer;
|
|
|
+ timer->next_timeout = calc_next_timeout(period->minute, period->hour, period->day,
|
|
|
+ period->week, period->month) * 1e6;
|
|
|
+ }
|
|
|
+ heap_insert(&loop->timers, &timer->node);
|
|
|
+ }
|
|
|
}
|
|
|
return ntimers;
|
|
|
}
|
|
|
@@ -104,7 +93,7 @@ static int hloop_process_pendings(hloop_t* loop) {
|
|
|
for (int i = HEVENT_PRIORITY_SIZE-1; i >= 0; --i) {
|
|
|
next = loop->pendings[i];
|
|
|
while (next) {
|
|
|
- if (next->active && next->pending && next->cb) {
|
|
|
+ if (next->pending && next->cb) {
|
|
|
next->cb(next);
|
|
|
++ncbs;
|
|
|
}
|
|
|
@@ -112,6 +101,9 @@ static int hloop_process_pendings(hloop_t* loop) {
|
|
|
next = next->pending_next;
|
|
|
prev->pending = 0;
|
|
|
prev->pending_next = NULL;
|
|
|
+ if (prev->destroy) {
|
|
|
+ SAFE_FREE(prev);
|
|
|
+ }
|
|
|
}
|
|
|
loop->pendings[i] = NULL;
|
|
|
}
|
|
|
@@ -128,9 +120,9 @@ static int hloop_process_events(hloop_t* loop) {
|
|
|
hloop_update_time(loop);
|
|
|
if (loop->timers.root) {
|
|
|
uint64_t next_min_timeout = TIMER_ENTRY(loop->timers.root)->next_timeout;
|
|
|
- blocktime = next_min_timeout - hloop_now_hrtime(loop);
|
|
|
- if (blocktime <= 0) goto process_timers;
|
|
|
- blocktime /= 1000;
|
|
|
+ int64_t blocktime_us = next_min_timeout - hloop_now_hrtime(loop);
|
|
|
+ if (blocktime_us <= 0) goto process_timers;
|
|
|
+ blocktime = blocktime_us / 1000;
|
|
|
++blocktime;
|
|
|
blocktime = MIN(blocktime, MAX_BLOCK_TIME);
|
|
|
}
|
|
|
@@ -153,8 +145,10 @@ process_timers:
|
|
|
nidles= hloop_process_idles(loop);
|
|
|
}
|
|
|
}
|
|
|
- //printd("blocktime=%d nios=%d ntimers=%d nidles=%d nactives=%d npendings=%d\n", blocktime, nios, ntimers, nidles, loop->nactives, loop->npendings);
|
|
|
- return hloop_process_pendings(loop);
|
|
|
+ int ncbs = hloop_process_pendings(loop);
|
|
|
+ printd("blocktime=%d nios=%d ntimers=%d nidles=%d nactives=%d npendings=%d ncbs=%d\n",
|
|
|
+ blocktime, nios, ntimers, nidles, loop->nactives, loop->npendings, ncbs);
|
|
|
+ return ncbs;
|
|
|
}
|
|
|
|
|
|
int hloop_init(hloop_t* loop) {
|
|
|
@@ -204,6 +198,9 @@ void hloop_cleanup(hloop_t* loop) {
|
|
|
for (int i = 0; i < loop->ios.maxsize; ++i) {
|
|
|
hio_t* io = loop->ios.ptr[i];
|
|
|
if (io) {
|
|
|
+ if (!(io->io_type&HIO_TYPE_STDIO)) {
|
|
|
+ hclose(io);
|
|
|
+ }
|
|
|
hio_free(io);
|
|
|
}
|
|
|
}
|
|
|
@@ -263,7 +260,7 @@ hidle_t* hidle_add(hloop_t* loop, hidle_cb cb, uint32_t repeat) {
|
|
|
}
|
|
|
|
|
|
void hidle_del(hidle_t* idle) {
|
|
|
- if (idle->destroy) return;
|
|
|
+ if (!idle->active) return;
|
|
|
idle->loop->nidles--;
|
|
|
EVENT_DEL(idle);
|
|
|
}
|
|
|
@@ -319,9 +316,11 @@ htimer_t* htimer_add_period(hloop_t* loop, htimer_cb cb,
|
|
|
}
|
|
|
|
|
|
void htimer_del(htimer_t* timer) {
|
|
|
- if (timer->destroy) return;
|
|
|
+ if (!timer->active) return;
|
|
|
timer->loop->ntimers--;
|
|
|
EVENT_DEL(timer);
|
|
|
+ // NOTE: set timer->next_timeout to handle at next loop
|
|
|
+ timer->next_timeout = hloop_now_hrtime(timer->loop);
|
|
|
}
|
|
|
|
|
|
void hio_init(hio_t* io) {
|
|
|
@@ -332,7 +331,70 @@ void hio_init(hio_t* io) {
|
|
|
//write_queue_init(&io->write_queue, 4);;
|
|
|
}
|
|
|
|
|
|
+static void fill_io_type(hio_t* io) {
|
|
|
+ int type = 0;
|
|
|
+ socklen_t optlen = sizeof(int);
|
|
|
+ int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
|
|
|
+ printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
|
|
|
+ if (ret == 0) {
|
|
|
+ switch (type) {
|
|
|
+ case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
|
|
|
+ case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
|
|
|
+ case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
|
|
|
+ default: io->io_type = HIO_TYPE_SOCKET; break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (socket_errno() == ENOTSOCK) {
|
|
|
+ switch (io->fd) {
|
|
|
+ case 0: io->io_type = HIO_TYPE_STDIN; break;
|
|
|
+ case 1: io->io_type = HIO_TYPE_STDOUT; break;
|
|
|
+ case 2: io->io_type = HIO_TYPE_STDERR; break;
|
|
|
+ default: io->io_type = HIO_TYPE_FILE; break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void hio_socket_init(hio_t* io) {
|
|
|
+ // nonblocking
|
|
|
+ nonblocking(io->fd);
|
|
|
+ // fill io->localaddr io->peeraddr
|
|
|
+ if (io->localaddr == NULL) {
|
|
|
+ SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
|
|
|
+ }
|
|
|
+ if (io->peeraddr == NULL) {
|
|
|
+ io->peeraddrlen = sizeof(struct sockaddr_in6);
|
|
|
+ SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
|
|
|
+ }
|
|
|
+ socklen_t addrlen = sizeof(struct sockaddr_in6);
|
|
|
+ int ret = getsockname(io->fd, io->localaddr, &addrlen);
|
|
|
+ printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
|
|
|
+ // NOTE:
|
|
|
+ // tcp_server peeraddr set by accept
|
|
|
+ // udp_server peeraddr set by recvfrom
|
|
|
+ // tcp_client/udp_client peeraddr set by hio_setpeeraddr
|
|
|
+ if (io->io_type == HIO_TYPE_TCP) {
|
|
|
+ // tcp acceptfd
|
|
|
+ addrlen = sizeof(struct sockaddr_in6);
|
|
|
+ ret = getpeername(io->fd, io->peeraddr, &addrlen);
|
|
|
+ printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void hio_reset(hio_t* io) {
|
|
|
+ fill_io_type(io);
|
|
|
+ if (io->io_type & HIO_TYPE_SOCKET) {
|
|
|
+ hio_socket_init(io);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void hio_deinit(hio_t* io) {
|
|
|
+ offset_buf_t* pbuf = NULL;
|
|
|
+ while (!write_queue_empty(&io->write_queue)) {
|
|
|
+ pbuf = write_queue_front(&io->write_queue);
|
|
|
+ SAFE_FREE(pbuf->base);
|
|
|
+ write_queue_pop_front(&io->write_queue);
|
|
|
+ }
|
|
|
+ write_queue_cleanup(&io->write_queue);
|
|
|
io->closed = 0;
|
|
|
io->accept = io->connect = io->connectex = 0;
|
|
|
io->recv = io->send = 0;
|
|
|
@@ -349,16 +411,6 @@ void hio_reset(hio_t* io) {
|
|
|
io->hovlp = NULL;
|
|
|
}
|
|
|
|
|
|
-void hio_deinit(hio_t* io) {
|
|
|
- offset_buf_t* pbuf = NULL;
|
|
|
- while (!write_queue_empty(&io->write_queue)) {
|
|
|
- pbuf = write_queue_front(&io->write_queue);
|
|
|
- SAFE_FREE(pbuf->base);
|
|
|
- write_queue_pop_front(&io->write_queue);
|
|
|
- }
|
|
|
- write_queue_cleanup(&io->write_queue);
|
|
|
-}
|
|
|
-
|
|
|
void hio_free(hio_t* io) {
|
|
|
if (io == NULL) return;
|
|
|
hio_deinit(io);
|
|
|
@@ -367,52 +419,6 @@ void hio_free(hio_t* io) {
|
|
|
SAFE_FREE(io);
|
|
|
}
|
|
|
|
|
|
-static void fill_io_type(hio_t* io) {
|
|
|
- int type = 0;
|
|
|
- socklen_t optlen = sizeof(int);
|
|
|
- int ret = getsockopt(io->fd, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
|
|
|
- printd("getsockopt SO_TYPE fd=%d ret=%d type=%d errno=%d\n", io->fd, ret, type, socket_errno());
|
|
|
- if (ret == 0) {
|
|
|
- switch (type) {
|
|
|
- case SOCK_STREAM: io->io_type = HIO_TYPE_TCP; break;
|
|
|
- case SOCK_DGRAM: io->io_type = HIO_TYPE_UDP; break;
|
|
|
- case SOCK_RAW: io->io_type = HIO_TYPE_IP; break;
|
|
|
- default: io->io_type = HIO_TYPE_SOCKET; break;
|
|
|
- }
|
|
|
- // nonblocking
|
|
|
- nonblocking(io->fd);
|
|
|
- // fill io->localaddr io->peeraddr
|
|
|
- if (io->localaddr == NULL) {
|
|
|
- SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
|
|
|
- }
|
|
|
- if (io->peeraddr == NULL) {
|
|
|
- io->peeraddrlen = sizeof(struct sockaddr_in6);
|
|
|
- SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
|
|
|
- }
|
|
|
- socklen_t addrlen = sizeof(struct sockaddr_in6);
|
|
|
- ret = getsockname(io->fd, io->localaddr, &addrlen);
|
|
|
- printd("getsockname fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
|
|
|
- // NOTE:
|
|
|
- // tcp_server peeraddr set by accept
|
|
|
- // udp_server peeraddr set by recvfrom
|
|
|
- // tcp_client/udp_client peeraddr set by hio_setpeeraddr
|
|
|
- if (io->io_type == HIO_TYPE_TCP) {
|
|
|
- // tcp acceptfd
|
|
|
- addrlen = sizeof(struct sockaddr_in6);
|
|
|
- ret = getpeername(io->fd, io->peeraddr, &addrlen);
|
|
|
- printd("getpeername fd=%d ret=%d errno=%d\n", io->fd, ret, socket_errno());
|
|
|
- }
|
|
|
- }
|
|
|
- else if (socket_errno() == ENOTSOCK) {
|
|
|
- switch (io->fd) {
|
|
|
- case 0: io->io_type = HIO_TYPE_STDIN; break;
|
|
|
- case 1: io->io_type = HIO_TYPE_STDOUT; break;
|
|
|
- case 2: io->io_type = HIO_TYPE_STDERR; break;
|
|
|
- default: io->io_type = HIO_TYPE_FILE; break;
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
hio_t* hio_get(hloop_t* loop, int fd) {
|
|
|
if (loop->ios.maxsize == 0) {
|
|
|
io_array_init(&loop->ios, IO_ARRAY_INIT_SIZE);
|
|
|
@@ -432,16 +438,6 @@ hio_t* hio_get(hloop_t* loop, int fd) {
|
|
|
loop->ios.ptr[fd] = io;
|
|
|
}
|
|
|
|
|
|
- if (io->destroy) {
|
|
|
- io->destroy = 0;
|
|
|
- hio_reset(io);
|
|
|
- }
|
|
|
-
|
|
|
- if (io->io_type == HIO_TYPE_UNKNOWN) {
|
|
|
- // NOTE: fill io_type: this is important
|
|
|
- fill_io_type(io);
|
|
|
- }
|
|
|
-
|
|
|
return io;
|
|
|
}
|
|
|
|
|
|
@@ -449,12 +445,9 @@ int hio_add(hio_t* io, hio_cb cb, int events) {
|
|
|
printd("hio_add fd=%d events=%d\n", io->fd, events);
|
|
|
hloop_t* loop = io->loop;
|
|
|
if (!io->active) {
|
|
|
+ hio_reset(io);
|
|
|
EVENT_ADD(loop, io, cb);
|
|
|
loop->nios++;
|
|
|
- // NOTE: fill io_type: this is important
|
|
|
- if (io->io_type == HIO_TYPE_UNKNOWN) {
|
|
|
- fill_io_type(io);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
if (cb) {
|
|
|
@@ -468,12 +461,13 @@ int hio_add(hio_t* io, hio_cb cb, int events) {
|
|
|
|
|
|
int hio_del(hio_t* io, int events) {
|
|
|
printd("hio_del fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
|
|
|
- if (io->destroy) return 0;
|
|
|
+ if (!io->active) return 0;
|
|
|
iowatcher_del_event(io->loop, io->fd, events);
|
|
|
io->events &= ~events;
|
|
|
if (io->events == 0) {
|
|
|
io->loop->nios--;
|
|
|
- EVENT_DEL(io);
|
|
|
+ // NOTE: not EVENT_DEL, avoid free
|
|
|
+ EVENT_INACTIVE(io);
|
|
|
hio_deinit(io);
|
|
|
}
|
|
|
return 0;
|
|
|
@@ -587,6 +581,7 @@ hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb
|
|
|
hio_t* io = hio_get(loop, connfd);
|
|
|
if (io == NULL) return NULL;
|
|
|
io->recv = 1;
|
|
|
+ io->io_type = HIO_TYPE_TCP;
|
|
|
return hread(loop, connfd, buf, len, read_cb);
|
|
|
}
|
|
|
|
|
|
@@ -594,6 +589,7 @@ hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb
|
|
|
hio_t* io = hio_get(loop, connfd);
|
|
|
if (io == NULL) return NULL;
|
|
|
io->send = 1;
|
|
|
+ io->io_type = HIO_TYPE_TCP;
|
|
|
return hwrite(loop, connfd, buf, len, write_cb);
|
|
|
}
|
|
|
|
|
|
@@ -601,6 +597,7 @@ hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb rea
|
|
|
hio_t* io = hio_get(loop, sockfd);
|
|
|
if (io == NULL) return NULL;
|
|
|
io->recvfrom = 1;
|
|
|
+ io->io_type = HIO_TYPE_UDP;
|
|
|
return hread(loop, sockfd, buf, len, read_cb);
|
|
|
}
|
|
|
|
|
|
@@ -608,6 +605,7 @@ hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_c
|
|
|
hio_t* io = hio_get(loop, sockfd);
|
|
|
if (io == NULL) return NULL;
|
|
|
io->sendto = 1;
|
|
|
+ io->io_type = HIO_TYPE_UDP;
|
|
|
return hwrite(loop, sockfd, buf, len, write_cb);
|
|
|
}
|
|
|
|