1
0

nio.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #include "iowatcher.h"
  2. #ifndef EVENT_IOCP
  3. #include "hsocket.h"
  4. static void nio_accept(hio_t* io) {
  5. //printd("nio_accept listenfd=%d\n", io->fd);
  6. socklen_t addrlen;
  7. if (io->localaddr == NULL) {
  8. SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
  9. }
  10. if (io->peeraddr == NULL) {
  11. SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
  12. }
  13. accept:
  14. addrlen = sizeof(struct sockaddr_in6);
  15. int connfd = accept(io->fd, io->peeraddr, &addrlen);
  16. if (connfd < 0) {
  17. if (socket_errno() == EAGAIN) {
  18. //goto accept_done;
  19. return;
  20. }
  21. else {
  22. io->error = socket_errno();
  23. perror("accept");
  24. goto accept_error;
  25. }
  26. }
  27. addrlen = sizeof(struct sockaddr_in6);
  28. getsockname(connfd, io->localaddr, &addrlen);
  29. if (io->accept_cb) {
  30. char localaddrstr[INET6_ADDRSTRLEN+16] = {0};
  31. char peeraddrstr[INET6_ADDRSTRLEN+16] = {0};
  32. printd("accept listenfd=%d connfd=%d [%s] <= [%s]\n", io->fd, connfd,
  33. sockaddr_snprintf(io->localaddr, localaddrstr, sizeof(localaddrstr)),
  34. sockaddr_snprintf(io->peeraddr, peeraddrstr, sizeof(peeraddrstr)));
  35. printd("accept_cb------\n");
  36. io->accept_cb(io, connfd);
  37. printd("accept_cb======\n");
  38. }
  39. goto accept;
  40. accept_error:
  41. hclose(io);
  42. }
  43. static void nio_connect(hio_t* io) {
  44. //printd("nio_connect connfd=%d\n", io->fd);
  45. int state = 0;
  46. socklen_t addrlen;
  47. if (io->localaddr == NULL) {
  48. SAFE_ALLOC(io->localaddr, sizeof(struct sockaddr_in6));
  49. addrlen = sizeof(struct sockaddr_in6);
  50. getsockname(io->fd, io->localaddr, &addrlen);
  51. }
  52. if (io->peeraddr == NULL) {
  53. SAFE_ALLOC(io->peeraddr, sizeof(struct sockaddr_in6));
  54. }
  55. addrlen = sizeof(struct sockaddr_in6);
  56. int ret = getpeername(io->fd, io->peeraddr, &addrlen);
  57. if (ret < 0) {
  58. io->error = socket_errno();
  59. printd("connect failed: %s: %d\n", strerror(socket_errno()), socket_errno());
  60. state = 0;
  61. }
  62. else {
  63. char localaddrstr[INET6_ADDRSTRLEN+16] = {0};
  64. char peeraddrstr[INET6_ADDRSTRLEN+16] = {0};
  65. printd("connect connfd=%d [%s] => [%s]\n", io->fd,
  66. sockaddr_snprintf(io->localaddr, localaddrstr, sizeof(localaddrstr)),
  67. sockaddr_snprintf(io->peeraddr, peeraddrstr, sizeof(peeraddrstr)));
  68. state = 1;
  69. }
  70. if (io->connect_cb) {
  71. printd("connect_cb------\n");
  72. io->connect_cb(io, state);
  73. printd("connect_cb======\n");
  74. }
  75. if (state == 0) {
  76. hclose(io);
  77. }
  78. }
  79. static void nio_read(hio_t* io) {
  80. //printd("nio_read fd=%d\n", io->fd);
  81. int nread;
  82. void* buf = io->readbuf.base;
  83. int len = io->readbuf.len;
  84. read:
  85. memset(buf, 0, len);
  86. #ifdef OS_UNIX
  87. nread = read(io->fd, buf, len);
  88. #else
  89. nread = recv(io->fd, buf, len, 0);
  90. #endif
  91. //printd("read retval=%d\n", nread);
  92. if (nread < 0) {
  93. if (socket_errno() == EAGAIN) {
  94. //goto read_done;
  95. return;
  96. }
  97. else {
  98. io->error = socket_errno();
  99. perror("read");
  100. goto read_error;
  101. }
  102. }
  103. if (nread == 0) {
  104. goto disconnect;
  105. }
  106. //printd("> %s\n", buf);
  107. if (io->read_cb) {
  108. printd("read_cb------\n");
  109. io->read_cb(io, buf, nread);
  110. printd("read_cb======\n");
  111. }
  112. if (nread == len) {
  113. goto read;
  114. }
  115. return;
  116. read_error:
  117. disconnect:
  118. hclose(io);
  119. }
  120. static void nio_write(hio_t* io) {
  121. //printd("nio_write fd=%d\n", io->fd);
  122. int nwrite = 0;
  123. write:
  124. if (write_queue_empty(&io->write_queue)) {
  125. return;
  126. }
  127. offset_buf_t* pbuf = write_queue_front(&io->write_queue);
  128. char* buf = pbuf->base + pbuf->offset;
  129. int len = pbuf->len - pbuf->offset;
  130. #ifdef OS_UNIX
  131. nwrite = write(io->fd, buf, len);
  132. #else
  133. nwrite = send(io->fd, buf, len, 0);
  134. #endif
  135. //printd("write retval=%d\n", nwrite);
  136. if (nwrite < 0) {
  137. if (socket_errno() == EAGAIN) {
  138. //goto write_done;
  139. return;
  140. }
  141. else {
  142. io->error = socket_errno();
  143. perror("write");
  144. goto write_error;
  145. }
  146. }
  147. if (nwrite == 0) {
  148. goto disconnect;
  149. }
  150. if (io->write_cb) {
  151. printd("write_cb------\n");
  152. io->write_cb(io, buf, nwrite);
  153. printd("write_cb======\n");
  154. }
  155. pbuf->offset += nwrite;
  156. if (nwrite == len) {
  157. SAFE_FREE(pbuf->base);
  158. write_queue_pop_front(&io->write_queue);
  159. // write next
  160. goto write;
  161. }
  162. return;
  163. write_error:
  164. disconnect:
  165. hclose(io);
  166. }
  167. static void hio_handle_events(hio_t* io) {
  168. if ((io->events & READ_EVENT) && (io->revents & READ_EVENT)) {
  169. if (io->accept) {
  170. nio_accept(io);
  171. }
  172. else {
  173. nio_read(io);
  174. }
  175. }
  176. if ((io->events & WRITE_EVENT) && (io->revents & WRITE_EVENT)) {
  177. if (io->connect) {
  178. // NOTE: connect just do once
  179. // ONESHOT
  180. hio_del(io, WRITE_EVENT);
  181. io->connect = 0;
  182. nio_connect(io);
  183. }
  184. else {
  185. nio_write(io);
  186. // NOTE: del WRITE_EVENT, if write_queue empty
  187. if (write_queue_empty(&io->write_queue)) {
  188. hio_del(io, WRITE_EVENT);
  189. }
  190. }
  191. }
  192. io->revents = 0;
  193. }
  194. hio_t* haccept (hloop_t* loop, int listenfd, haccept_cb accept_cb) {
  195. hio_t* io = hio_add(loop, hio_handle_events, listenfd, READ_EVENT);
  196. if (io == NULL) return NULL;
  197. if (accept_cb) {
  198. io->accept_cb = accept_cb;
  199. }
  200. io->accept = 1;
  201. nonblocking(listenfd);
  202. return io;
  203. }
  204. hio_t* hconnect (hloop_t* loop, const char* host, int port, hconnect_cb connect_cb) {
  205. int connfd = Connect(host, port, 1);
  206. if (connfd < 0) {
  207. return NULL;
  208. }
  209. hio_t* io = hio_add(loop, hio_handle_events, connfd, WRITE_EVENT);
  210. if (io == NULL) {
  211. closesocket(connfd);
  212. return NULL;
  213. }
  214. if (connect_cb) {
  215. io->connect_cb = connect_cb;
  216. }
  217. io->connect = 1;
  218. nonblocking(connfd);
  219. return io;
  220. }
  221. hio_t* hread (hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
  222. hio_t* io = hio_add(loop, hio_handle_events, fd, READ_EVENT);
  223. if (io == NULL) return NULL;
  224. io->readbuf.base = (char*)buf;
  225. io->readbuf.len = len;
  226. if (read_cb) {
  227. io->read_cb = read_cb;
  228. }
  229. return io;
  230. }
  231. hio_t* hwrite (hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
  232. hio_t* io = hio_add(loop, hio_handle_events, fd, 0);
  233. if (io == NULL) return NULL;
  234. if (write_cb) {
  235. io->write_cb = write_cb;
  236. }
  237. int nwrite = 0;
  238. if (write_queue_empty(&io->write_queue)) {
  239. try_write:
  240. #ifdef OS_UNIX
  241. nwrite = write(fd, buf, len);
  242. #else
  243. nwrite = send(fd, buf, len, 0);
  244. #endif
  245. //printd("write retval=%d\n", nwrite);
  246. if (nwrite < 0) {
  247. if (socket_errno() == EAGAIN) {
  248. nwrite = 0;
  249. goto enqueue;
  250. }
  251. else {
  252. perror("write");
  253. io->error = socket_errno();
  254. goto write_error;
  255. }
  256. }
  257. if (nwrite == 0) {
  258. goto disconnect;
  259. }
  260. if (write_cb) {
  261. printd("try_write_cb------\n");
  262. write_cb(io, buf, nwrite);
  263. printd("try_write_cb======\n");
  264. }
  265. if (nwrite == len) {
  266. //goto write_done;
  267. return io;
  268. }
  269. hio_add(loop, hio_handle_events, fd, WRITE_EVENT);
  270. }
  271. enqueue:
  272. if (nwrite < len) {
  273. offset_buf_t rest;
  274. rest.len = len;
  275. rest.offset = nwrite;
  276. // NOTE: free in nio_write;
  277. SAFE_ALLOC(rest.base, rest.len);
  278. if (rest.base == NULL) return io;
  279. memcpy(rest.base, (char*)buf, rest.len);
  280. if (io->write_queue.maxsize == 0) {
  281. write_queue_init(&io->write_queue, 4);
  282. }
  283. write_queue_push_back(&io->write_queue, &rest);
  284. }
  285. return io;
  286. write_error:
  287. disconnect:
  288. hclose(io);
  289. return io;
  290. }
  291. void hclose (hio_t* io) {
  292. //printd("close fd=%d\n", io->fd);
  293. if (io->closed) return;
  294. #ifdef OS_UNIX
  295. close(io->fd);
  296. #else
  297. closesocket(io->fd);
  298. #endif
  299. io->closed = 1;
  300. if (io->close_cb) {
  301. printd("close_cb------\n");
  302. io->close_cb(io);
  303. printd("close_cb======\n");
  304. }
  305. hio_del(io, ALL_EVENTS);
  306. }
  307. #endif