nio.c 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. #include "iowatcher.h"
  2. #ifndef EVENT_IOCP
  3. #include "hsocket.h"
  4. static void nio_accept(hio_t* io) {
  5. //printd("nio_accept listenfd=%d\n", io->fd);
  6. socklen_t addrlen;
  7. if (io->localaddr == NULL) {
  8. io->localaddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
  9. }
  10. if (io->peeraddr == NULL) {
  11. io->peeraddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
  12. }
  13. accept:
  14. addrlen = sizeof(struct sockaddr_in);
  15. int connfd = accept(io->fd, io->peeraddr, &addrlen);
  16. if (connfd < 0) {
  17. if (sockerrno == NIO_EAGAIN) {
  18. //goto accept_done;
  19. return;
  20. }
  21. else {
  22. io->error = sockerrno;
  23. perror("accept");
  24. goto accept_error;
  25. }
  26. }
  27. addrlen = sizeof(struct sockaddr_in);
  28. getsockname(connfd, io->localaddr, &addrlen);
  29. if (io->accept_cb) {
  30. struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
  31. struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
  32. char localip[64];
  33. char peerip[64];
  34. inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
  35. inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
  36. printd("accept listenfd=%d connfd=%d [%s:%d] <= [%s:%d]\n", io->fd, connfd,
  37. localip, ntohs(localaddr->sin_port),
  38. peerip, ntohs(peeraddr->sin_port));
  39. printd("accept_cb------\n");
  40. io->accept_cb(io, connfd);
  41. printd("accept_cb======\n");
  42. }
  43. goto accept;
  44. accept_error:
  45. hclose(io);
  46. }
  47. static void nio_connect(hio_t* io) {
  48. //printd("nio_connect connfd=%d\n", io->fd);
  49. int state = 0;
  50. socklen_t addrlen;
  51. if (io->localaddr == NULL) {
  52. io->localaddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
  53. addrlen = sizeof(struct sockaddr_in);
  54. getsockname(io->fd, io->localaddr, &addrlen);
  55. }
  56. if (io->peeraddr == NULL) {
  57. io->peeraddr = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
  58. }
  59. addrlen = sizeof(struct sockaddr_in);
  60. int ret = getpeername(io->fd, io->peeraddr, &addrlen);
  61. if (ret < 0) {
  62. io->error = sockerrno;
  63. printd("connect failed: %s: %d\n", strerror(sockerrno), sockerrno);
  64. state = 0;
  65. }
  66. else {
  67. struct sockaddr_in* localaddr = (struct sockaddr_in*)io->localaddr;
  68. struct sockaddr_in* peeraddr = (struct sockaddr_in*)io->peeraddr;
  69. char localip[64];
  70. char peerip[64];
  71. inet_ntop(AF_INET, &localaddr->sin_addr, localip, sizeof(localip));
  72. inet_ntop(AF_INET, &peeraddr->sin_addr, peerip, sizeof(peerip));
  73. printd("connect connfd=%d [%s:%d] => [%s:%d]\n", io->fd,
  74. localip, ntohs(localaddr->sin_port),
  75. peerip, ntohs(peeraddr->sin_port));
  76. state = 1;
  77. }
  78. if (io->connect_cb) {
  79. printd("connect_cb------\n");
  80. io->connect_cb(io, state);
  81. printd("connect_cb======\n");
  82. }
  83. if (state == 0) {
  84. hclose(io);
  85. }
  86. }
  87. static void nio_read(hio_t* io) {
  88. //printd("nio_read fd=%d\n", io->fd);
  89. int nread;
  90. void* buf = io->readbuf.base;
  91. int len = io->readbuf.len;
  92. read:
  93. memset(buf, 0, len);
  94. #ifdef OS_UNIX
  95. nread = read(io->fd, buf, len);
  96. #else
  97. nread = recv(io->fd, buf, len, 0);
  98. #endif
  99. //printd("read retval=%d\n", nread);
  100. if (nread < 0) {
  101. if (sockerrno == NIO_EAGAIN) {
  102. //goto read_done;
  103. return;
  104. }
  105. else {
  106. io->error = sockerrno;
  107. perror("read");
  108. goto read_error;
  109. }
  110. }
  111. if (nread == 0) {
  112. goto disconnect;
  113. }
  114. //printd("> %s\n", buf);
  115. if (io->read_cb) {
  116. printd("read_cb------\n");
  117. io->read_cb(io, buf, nread);
  118. printd("read_cb======\n");
  119. }
  120. if (nread == len) {
  121. goto read;
  122. }
  123. return;
  124. read_error:
  125. disconnect:
  126. hclose(io);
  127. }
  128. static void nio_write(hio_t* io) {
  129. //printd("nio_write fd=%d\n", io->fd);
  130. int nwrite = 0;
  131. write:
  132. if (write_queue_empty(&io->write_queue)) {
  133. return;
  134. }
  135. offset_buf_t* pbuf = write_queue_front(&io->write_queue);
  136. char* buf = pbuf->base + pbuf->offset;
  137. int len = pbuf->len - pbuf->offset;
  138. #ifdef OS_UNIX
  139. nwrite = write(io->fd, buf, len);
  140. #else
  141. nwrite = send(io->fd, buf, len, 0);
  142. #endif
  143. //printd("write retval=%d\n", nwrite);
  144. if (nwrite < 0) {
  145. if (sockerrno == NIO_EAGAIN) {
  146. //goto write_done;
  147. return;
  148. }
  149. else {
  150. io->error = sockerrno;
  151. perror("write");
  152. goto write_error;
  153. }
  154. }
  155. if (nwrite == 0) {
  156. goto disconnect;
  157. }
  158. if (io->write_cb) {
  159. printd("write_cb------\n");
  160. io->write_cb(io, buf, nwrite);
  161. printd("write_cb======\n");
  162. }
  163. pbuf->offset += nwrite;
  164. if (nwrite == len) {
  165. SAFE_FREE(pbuf->base);
  166. write_queue_pop_front(&io->write_queue);
  167. // write next
  168. goto write;
  169. }
  170. return;
  171. write_error:
  172. disconnect:
  173. hclose(io);
  174. }
  175. static void hio_handle_events(hio_t* io) {
  176. if ((io->events & READ_EVENT) && (io->revents & READ_EVENT)) {
  177. if (io->accept) {
  178. nio_accept(io);
  179. }
  180. else {
  181. nio_read(io);
  182. }
  183. }
  184. if ((io->events & WRITE_EVENT) && (io->revents & WRITE_EVENT)) {
  185. if (io->connect) {
  186. // NOTE: connect just do once
  187. // ONESHOT
  188. hio_del(io, WRITE_EVENT);
  189. io->connect = 0;
  190. nio_connect(io);
  191. }
  192. else {
  193. nio_write(io);
  194. // NOTE: del WRITE_EVENT, if write_queue empty
  195. if (write_queue_empty(&io->write_queue)) {
  196. hio_del(io, WRITE_EVENT);
  197. }
  198. }
  199. }
  200. io->revents = 0;
  201. }
  202. hio_t* haccept (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
  203. hio_t* io = hio_add(loop, hio_handle_events, listenfd, READ_EVENT);
  204. if (io == NULL) return NULL;
  205. if (accept_cb) {
  206. io->accept_cb = accept_cb;
  207. }
  208. io->accept = 1;
  209. nonblocking(listenfd);
  210. return io;
  211. }
  212. hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
  213. int connfd = Connect(host, port, 1);
  214. if (connfd < 0) {
  215. return NULL;
  216. }
  217. hio_t* io = hio_add(loop, hio_handle_events, connfd, WRITE_EVENT);
  218. if (io == NULL) {
  219. closesocket(connfd);
  220. return NULL;
  221. }
  222. if (connect_cb) {
  223. io->connect_cb = connect_cb;
  224. }
  225. io->connect = 1;
  226. nonblocking(connfd);
  227. return io;
  228. }
  229. hio_t* hread (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
  230. hio_t* io = hio_add(loop, hio_handle_events, fd, READ_EVENT);
  231. if (io == NULL) return NULL;
  232. io->readbuf.base = (char*)buf;
  233. io->readbuf.len = len;
  234. if (read_cb) {
  235. io->read_cb = read_cb;
  236. }
  237. return io;
  238. }
  239. hio_t* hwrite (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
  240. hio_t* io = hio_add(loop, hio_handle_events, fd, 0);
  241. if (io == NULL) return NULL;
  242. if (write_cb) {
  243. io->write_cb = write_cb;
  244. }
  245. int nwrite = 0;
  246. if (write_queue_empty(&io->write_queue)) {
  247. try_write:
  248. #ifdef OS_UNIX
  249. nwrite = write(fd, buf, len);
  250. #else
  251. nwrite = send(fd, buf, len, 0);
  252. #endif
  253. //printd("write retval=%d\n", nwrite);
  254. if (nwrite < 0) {
  255. if (sockerrno == NIO_EAGAIN) {
  256. nwrite = 0;
  257. goto enqueue;
  258. }
  259. else {
  260. perror("write");
  261. io->error = sockerrno;
  262. goto write_error;
  263. }
  264. }
  265. if (nwrite == 0) {
  266. goto disconnect;
  267. }
  268. if (write_cb) {
  269. printd("try_write_cb------\n");
  270. write_cb(io, buf, nwrite);
  271. printd("try_write_cb======\n");
  272. }
  273. if (nwrite == len) {
  274. //goto write_done;
  275. return io;
  276. }
  277. hio_add(loop, hio_handle_events, fd, WRITE_EVENT);
  278. }
  279. enqueue:
  280. if (nwrite < len) {
  281. offset_buf_t rest;
  282. rest.len = len;
  283. rest.offset = nwrite;
  284. // NOTE: free in nio_write;
  285. rest.base = (char*)malloc(rest.len);
  286. if (rest.base == NULL) return io;
  287. memcpy(rest.base, (char*)buf, rest.len);
  288. if (io->write_queue.maxsize == 0) {
  289. write_queue_init(&io->write_queue, 4);
  290. }
  291. write_queue_push_back(&io->write_queue, &rest);
  292. }
  293. return io;
  294. write_error:
  295. disconnect:
  296. hclose(io);
  297. return io;
  298. }
  299. void hclose (hio_t* io) {
  300. //printd("close fd=%d\n", io->fd);
  301. if (io->closed) return;
  302. #ifdef OS_UNIX
  303. close(io->fd);
  304. #else
  305. closesocket(io->fd);
  306. #endif
  307. io->closed = 1;
  308. if (io->close_cb) {
  309. printd("close_cb------\n");
  310. io->close_cb(io);
  311. printd("close_cb======\n");
  312. }
  313. hio_del(io, ALL_EVENTS);
  314. }
  315. #endif