Răsfoiți Sursa

Add flow control for upstream (#316)

ithewei 3 ani în urmă
părinte
comite
1906765be6
2 a modificat fișierele cu 15 adăugiri și 1 ștergeri
  1. 13 1
      event/hevent.c
  2. 2 0
      event/hloop.h

+ 13 - 1
event/hevent.c

@@ -878,10 +878,22 @@ void hio_read_upstream(hio_t* io) {
     }
 }
 
+void hio_read_upstream_on_write_complete(hio_t* io, const void* buf, int writebytes) {
+    hio_t* upstream_io = io->upstream_io;
+    if (upstream_io && hio_write_is_complete(io)) {
+        hio_read(upstream_io);
+    }
+}
+
 void hio_write_upstream(hio_t* io, void* buf, int bytes) {
     hio_t* upstream_io = io->upstream_io;
     if (upstream_io) {
-        hio_write(upstream_io, buf, bytes);
+        int nwrite = hio_write(upstream_io, buf, bytes);
+        // if (!hio_write_is_complete(upstream_io)) {
+        if (nwrite >= 0 && nwrite < bytes) {
+            hio_read_stop(io);
+            hio_setcb_write(upstream_io, hio_read_upstream_on_write_complete);
+        }
     }
 }
 

+ 2 - 0
event/hloop.h

@@ -437,6 +437,8 @@ HV_EXPORT hio_t* hloop_create_udp_client (hloop_t* loop, const char* host, int p
 // hio_read(io)
 // hio_read(io->upstream_io)
 HV_EXPORT void   hio_read_upstream(hio_t* io);
+// on_write(io) -> hio_write_is_complete(io) -> hio_read(io->upstream_io)
+HV_EXPORT void   hio_read_upstream_on_write_complete(hio_t* io, const void* buf, int writebytes);
 // hio_write(io->upstream_io, buf, bytes)
 HV_EXPORT void   hio_write_upstream(hio_t* io, void* buf, int bytes);
 // hio_close(io->upstream_io)