|
|
@@ -675,6 +675,8 @@ void hio_ready(hio_t* io) {
|
|
|
io->heartbeat_interval = 0;
|
|
|
io->heartbeat_fn = NULL;
|
|
|
io->heartbeat_timer = NULL;
|
|
|
+ // upstream
|
|
|
+ io->upstream_io = NULL;
|
|
|
// private:
|
|
|
io->event_index[0] = io->event_index[1] = -1;
|
|
|
io->hovlp = NULL;
|
|
|
@@ -750,7 +752,7 @@ hio_t* hio_get(hloop_t* loop, int fd) {
|
|
|
}
|
|
|
|
|
|
int hio_add(hio_t* io, hio_cb cb, int events) {
|
|
|
- printd("hio_add fd=%d events=%d\n", io->fd, events);
|
|
|
+ printd("hio_add fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
|
|
|
#ifdef OS_WIN
|
|
|
// Windows iowatcher not work on stdio
|
|
|
if (io->fd < 3) return -1;
|
|
|
@@ -769,8 +771,10 @@ int hio_add(hio_t* io, hio_cb cb, int events) {
|
|
|
io->cb = (hevent_cb)cb;
|
|
|
}
|
|
|
|
|
|
- iowatcher_add_event(loop, io->fd, events);
|
|
|
- io->events |= events;
|
|
|
+ if (!(io->events & events)) {
|
|
|
+ iowatcher_add_event(loop, io->fd, events);
|
|
|
+ io->events |= events;
|
|
|
+ }
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -782,7 +786,7 @@ int hio_del(hio_t* io, int events) {
|
|
|
#endif
|
|
|
if (!io->active) return -1;
|
|
|
|
|
|
- if (io->events) {
|
|
|
+ if (io->events & events) {
|
|
|
iowatcher_del_event(io->loop, io->fd, events);
|
|
|
io->events &= ~events;
|
|
|
}
|
|
|
@@ -878,19 +882,7 @@ hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_c
|
|
|
return hwrite(loop, sockfd, buf, len, write_cb);
|
|
|
}
|
|
|
|
|
|
-hio_t* hloop_create_tcp_server (hloop_t* loop, const char* host, int port, haccept_cb accept_cb) {
|
|
|
- int listenfd = Listen(port, host);
|
|
|
- if (listenfd < 0) {
|
|
|
- return NULL;
|
|
|
- }
|
|
|
- hio_t* io = haccept(loop, listenfd, accept_cb);
|
|
|
- if (io == NULL) {
|
|
|
- closesocket(listenfd);
|
|
|
- }
|
|
|
- return io;
|
|
|
-}
|
|
|
-
|
|
|
-hio_t* hloop_create_tcp_client (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
|
|
|
+hio_t* hio_create(hloop_t* loop, const char* host, int port, int type) {
|
|
|
sockaddr_u peeraddr;
|
|
|
memset(&peeraddr, 0, sizeof(peeraddr));
|
|
|
int ret = sockaddr_set_ipport(&peeraddr, host, port);
|
|
|
@@ -898,7 +890,7 @@ hio_t* hloop_create_tcp_client (hloop_t* loop, const char* host, int port, hconn
|
|
|
//printf("unknown host: %s\n", host);
|
|
|
return NULL;
|
|
|
}
|
|
|
- int connfd = socket(peeraddr.sa.sa_family, SOCK_STREAM, 0);
|
|
|
+ int connfd = socket(peeraddr.sa.sa_family, type, 0);
|
|
|
if (connfd < 0) {
|
|
|
perror("socket");
|
|
|
return NULL;
|
|
|
@@ -907,11 +899,28 @@ hio_t* hloop_create_tcp_client (hloop_t* loop, const char* host, int port, hconn
|
|
|
hio_t* io = hio_get(loop, connfd);
|
|
|
assert(io != NULL);
|
|
|
hio_set_peeraddr(io, &peeraddr.sa, sockaddr_len(&peeraddr));
|
|
|
- hconnect(loop, connfd, connect_cb);
|
|
|
return io;
|
|
|
}
|
|
|
|
|
|
-// @server: socket -> bind -> hrecvfrom
|
|
|
+hio_t* hloop_create_tcp_server (hloop_t* loop, const char* host, int port, haccept_cb accept_cb) {
|
|
|
+ int listenfd = Listen(port, host);
|
|
|
+ if (listenfd < 0) {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ hio_t* io = haccept(loop, listenfd, accept_cb);
|
|
|
+ if (io == NULL) {
|
|
|
+ closesocket(listenfd);
|
|
|
+ }
|
|
|
+ return io;
|
|
|
+}
|
|
|
+
|
|
|
+hio_t* hloop_create_tcp_client (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
|
|
|
+ hio_t* io = hio_create(loop, host, port, SOCK_STREAM);
|
|
|
+ if (io == NULL) return NULL;
|
|
|
+ hconnect(loop, io->fd, connect_cb);
|
|
|
+ return io;
|
|
|
+}
|
|
|
+
|
|
|
hio_t* hloop_create_udp_server(hloop_t* loop, const char* host, int port) {
|
|
|
int bindfd = Bind(port, host, SOCK_DGRAM);
|
|
|
if (bindfd < 0) {
|
|
|
@@ -920,24 +929,59 @@ hio_t* hloop_create_udp_server(hloop_t* loop, const char* host, int port) {
|
|
|
return hio_get(loop, bindfd);
|
|
|
}
|
|
|
|
|
|
-// @client: Resolver -> socket -> hio_get -> hio_set_peeraddr
|
|
|
hio_t* hloop_create_udp_client(hloop_t* loop, const char* host, int port) {
|
|
|
- sockaddr_u peeraddr;
|
|
|
- memset(&peeraddr, 0, sizeof(peeraddr));
|
|
|
- int ret = sockaddr_set_ipport(&peeraddr, host, port);
|
|
|
- if (ret != 0) {
|
|
|
- //printf("unknown host: %s\n", host);
|
|
|
- return NULL;
|
|
|
+ return hio_create(loop, host, port, SOCK_DGRAM);
|
|
|
+}
|
|
|
+
|
|
|
+// upstream
|
|
|
+void hio_read_upstream(hio_t* io) {
|
|
|
+ hio_t* upstream_io = io->upstream_io;
|
|
|
+ if (upstream_io) {
|
|
|
+ hio_read(io);
|
|
|
+ hio_read(upstream_io);
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- int sockfd = socket(peeraddr.sa.sa_family, SOCK_DGRAM, 0);
|
|
|
- if (sockfd < 0) {
|
|
|
- perror("socket");
|
|
|
- return NULL;
|
|
|
+void hio_write_upstream(hio_t* io, void* buf, int bytes) {
|
|
|
+ hio_t* upstream_io = io->upstream_io;
|
|
|
+ if (upstream_io) {
|
|
|
+ hio_write(upstream_io, buf, bytes);
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- hio_t* io = hio_get(loop, sockfd);
|
|
|
- assert(io != NULL);
|
|
|
- hio_set_peeraddr(io, &peeraddr.sa, sockaddr_len(&peeraddr));
|
|
|
- return io;
|
|
|
+void hio_close_upstream(hio_t* io) {
|
|
|
+ hio_t* upstream_io = io->upstream_io;
|
|
|
+ if (upstream_io) {
|
|
|
+ hio_close(upstream_io);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void hio_setup_upstream(hio_t* io1, hio_t* io2) {
|
|
|
+ io1->upstream_io = io2;
|
|
|
+ io2->upstream_io = io1;
|
|
|
+ hio_setcb_read(io1, hio_write_upstream);
|
|
|
+ hio_setcb_read(io2, hio_write_upstream);
|
|
|
+}
|
|
|
+
|
|
|
+hio_t* hio_get_upstream(hio_t* io) {
|
|
|
+ return io->upstream_io;
|
|
|
+}
|
|
|
+
|
|
|
+hio_t* hio_setup_tcp_upstream(hio_t* io, const char* host, int port, int ssl) {
|
|
|
+ hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_STREAM);
|
|
|
+ if (upstream_io == NULL) return NULL;
|
|
|
+ if (ssl) hio_enable_ssl(upstream_io);
|
|
|
+ hio_setup_upstream(io, upstream_io);
|
|
|
+ hio_setcb_close(io, hio_close_upstream);
|
|
|
+ hio_setcb_close(upstream_io, hio_close_upstream);
|
|
|
+ hconnect(io->loop, upstream_io->fd, hio_read_upstream);
|
|
|
+ return upstream_io;
|
|
|
+}
|
|
|
+
|
|
|
+hio_t* hio_setup_udp_upstream(hio_t* io, const char* host, int port) {
|
|
|
+ hio_t* upstream_io = hio_create(io->loop, host, port, SOCK_DGRAM);
|
|
|
+ if (upstream_io == NULL) return NULL;
|
|
|
+ hio_setup_upstream(io, upstream_io);
|
|
|
+ hio_read_upstream(io);
|
|
|
+ return upstream_io;
|
|
|
}
|