nio.cpp 6.3 KB


  1. #include "hloop.h"
  2. #include "hio.h"
  3. #include "hsocket.h"
  4. static void on_accept(hio_t* io, void* userdata) {
  5. //printf("on_accept listenfd=%d\n", io->fd);
  6. struct sockaddr_in peeraddr;
  7. socklen_t addrlen;
  8. //struct sockaddr_in localaddr;
  9. //addrlen = sizeof(struct sockaddr_in);
  10. //getsockname(io->fd, (struct sockaddr*)&localaddr, &addrlen);
  11. accept:
  12. addrlen = sizeof(struct sockaddr_in);
  13. int connfd = accept(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
  14. if (connfd < 0) {
  15. if (sockerrno == NIO_EAGAIN) {
  16. //goto accept_done;
  17. return;
  18. }
  19. else {
  20. perror("accept");
  21. goto accept_error;
  22. }
  23. }
  24. //printf("accept connfd=%d [%s:%d] <= [%s:%d]\n", connfd,
  25. //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
  26. //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
  27. if (io->accept_cb) {
  28. io->accept_cb(io, connfd, io->accept_userdata);
  29. }
  30. goto accept;
  31. accept_error:
  32. hclose(io);
  33. }
  34. static void on_connect(hio_t* io, void* userdata) {
  35. //printf("on_connect connfd=%d\n", io->fd);
  36. int state = 0;
  37. struct sockaddr_in peeraddr;
  38. socklen_t addrlen;
  39. addrlen = sizeof(struct sockaddr_in);
  40. int ret = getpeername(io->fd, (struct sockaddr*)&peeraddr, &addrlen);
  41. if (ret < 0) {
  42. //printf("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
  43. state = 0;
  44. }
  45. else {
  46. //struct sockaddr_in localaddr;
  47. //addrlen = sizeof(struct sockaddr_in);
  48. //getsockname(ioent->fd, (struct sockaddr*)&localaddr, &addrlen);
  49. //printf("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
  50. //inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port),
  51. //inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
  52. state = 1;
  53. }
  54. if (io->connect_cb) {
  55. io->connect_cb(io, state, io->connect_userdata);
  56. }
  57. }
  58. static void on_readable(hio_t* io, void* userdata) {
  59. //printf("on_read fd=%d\n", io->fd);
  60. int nread;
  61. void* buf = io->readbuf;
  62. int len = io->readbuflen;
  63. read:
  64. memset(buf, 0, len);
  65. nread = read(io->fd, buf, len);
  66. //printf("read retval=%d\n", nread);
  67. if (nread < 0) {
  68. if (sockerrno == NIO_EAGAIN) {
  69. //goto read_done;
  70. return;
  71. }
  72. else {
  73. perror("read");
  74. goto read_error;
  75. }
  76. }
  77. if (nread == 0) {
  78. goto disconnect;
  79. }
  80. //printf("> %s\n", buf);
  81. if (io->read_cb) {
  82. io->read_cb(io, io->readbuf, nread, io->read_userdata);
  83. }
  84. if (nread == len) {
  85. goto read;
  86. }
  87. return;
  88. read_error:
  89. disconnect:
  90. hclose(io);
  91. }
  92. static void on_writeable(hio_t* io, void* userdata) {
  93. printf("on_write fd=%d\n", io->fd);
  94. /*
  95. int nwrite;
  96. write:
  97. if (io->write_queue.empty()) {
  98. return;
  99. }
  100. pbuf = io->write_queue.front();
  101. nwrite = write(ioent->fd, buf, len);
  102. if (nwrite < 0) {
  103. if (nwrite == NIO_EAGAIN) {
  104. //goto write_done;
  105. return;
  106. }
  107. else {
  108. }
  109. }
  110. if (io->write_cb) {
  111. io->write_cb(io, nwrite);
  112. }
  113. if (nwrite == len) {
  114. io->write_queue.pop_front();
  115. goto write;
  116. }
  117. //pbuf->buf += nwrite;
  118. return;
  119. write_error:
  120. disconnect:
  121. hclose(ioent);
  122. */
  123. }
  124. hio_t* haccept (hloop_t* loop, int listenfd, haccept_cb accept_cb, void* accept_userdata,
  125. hclose_cb close_cb, void* close_userdata) {
  126. hio_t* io = hio_accept(loop, listenfd, on_accept, NULL);
  127. if (io) {
  128. io->accept_cb = accept_cb;
  129. io->accept_userdata = accept_userdata;
  130. if (close_cb) {
  131. io->close_cb = close_cb;
  132. }
  133. if (close_userdata) {
  134. io->close_userdata = close_userdata;
  135. }
  136. }
  137. return io;
  138. }
  139. hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb, void* connect_userdata,
  140. hclose_cb close_cb, void* close_userdata) {
  141. hio_t* io = hio_connect(loop, connfd, on_connect, NULL);
  142. if (io) {
  143. io->connect_cb = connect_cb;
  144. io->connect_userdata = connect_userdata;
  145. if (close_cb) {
  146. io->close_cb = close_cb;
  147. }
  148. if (close_userdata) {
  149. io->close_userdata = close_userdata;
  150. }
  151. }
  152. return io;
  153. }
  154. hio_t* hread (hloop_t* loop, int fd, void* readbuf, size_t readbuflen, hread_cb read_cb, void* read_userdata,
  155. hclose_cb close_cb, void* close_userdata) {
  156. hio_t* io = hio_read(loop, fd, on_readable, NULL);
  157. if (io) {
  158. io->readbuf = (char*)readbuf;
  159. io->readbuflen = readbuflen;
  160. io->read_cb = read_cb;
  161. io->read_userdata = read_userdata;
  162. if (close_cb) {
  163. io->close_cb = close_cb;
  164. }
  165. if (close_userdata) {
  166. io->close_userdata = close_userdata;
  167. }
  168. }
  169. return io;
  170. }
  171. hio_t* hwrite (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb, void* write_userdata,
  172. hclose_cb close_cb, void* close_userdata) {
  173. hio_t* io = hio_add(loop, fd);
  174. if (io == NULL) return NULL;
  175. io->write_cb = write_cb;
  176. io->write_userdata = write_userdata;
  177. if (close_cb) {
  178. io->close_cb = close_cb;
  179. }
  180. if (close_userdata) {
  181. io->close_userdata = close_userdata;
  182. }
  183. int nwrite;
  184. if (1) {
  185. //if (io->write_queue.empty()) {
  186. try_write:
  187. nwrite = write(fd, buf, len);
  188. if (nwrite < 0) {
  189. if (sockerrno == NIO_EAGAIN) {
  190. nwrite = 0;
  191. goto push_queue;
  192. }
  193. else {
  194. perror("write");
  195. goto write_error;
  196. }
  197. goto write_error;
  198. }
  199. if (nwrite == 0) {
  200. goto disconnect;
  201. }
  202. if (write_cb) {
  203. write_cb(io, buf, nwrite, io->write_userdata);
  204. }
  205. if (nwrite == len) {
  206. //goto write_done;
  207. return io;
  208. }
  209. }
  210. push_queue:
  211. printf("write retval=%d buflen=%ld\n", nwrite, len);
  212. //ioent->write_queue.push(buf+nwrite, len-nwrite);
  213. //hioent_write(loop, fd, on_writeable, NULL);
  214. return io;
  215. write_error:
  216. disconnect:
  217. hclose(io);
  218. return io;
  219. }
  220. void hclose(hio_t* io) {
  221. //printf("close fd=%d\n", io->fd);
  222. close(io->fd);
  223. if (io->close_cb) {
  224. io->close_cb(io, io->close_userdata);
  225. }
  226. hio_del(io);
  227. }