Bläddra i källkod

mark thread-safe

ithewei 4 år sedan
förälder
incheckning
db02e31fe1
9 ändrade filer med 23 tillägg och 7 borttagningar
  1. 2 0
      evpp/Channel.h
  2. 1 0
      evpp/EventLoop.h
  3. 1 0
      evpp/EventLoopThread.h
  4. 1 0
      evpp/EventLoopThreadPool.h
  5. 5 2
      evpp/TcpClient.h
  6. 4 1
      evpp/TcpServer.h
  7. 1 0
      evpp/TimerThread.h
  8. 4 2
      evpp/UdpClient.h
  9. 4 2
      evpp/UdpServer.h

+ 2 - 0
evpp/Channel.h

@@ -111,6 +111,7 @@ public:
         return hio_readbytes(io_, len);
     }
 
+    // write thread-safe
     int write(const void* data, int size) {
         if (!isOpened()) return -1;
         return hio_write(io_, data, size);
@@ -124,6 +125,7 @@ public:
         return write(str.data(), str.size());
     }
 
+    // close thread-safe
     int close(bool async = false) {
         if (!isOpened()) return -1;
         if (async) {

+ 1 - 0
evpp/EventLoop.h

@@ -52,6 +52,7 @@ public:
         setStatus(kStopped);
     }
 
+    // stop thread-safe
     void stop() {
         if (loop_ == NULL) return;
         if (status() < kRunning) {

+ 1 - 0
evpp/EventLoopThread.h

@@ -60,6 +60,7 @@ public:
     }
 
     // @param wait_thread_started: if ture this method will block until loop_thread stopped.
+    // stop thread-safe
     void stop(bool wait_thread_stopped = false) {
         if (status() < kStarting || status() >= kStopping) return;
         setStatus(kStopping);

+ 1 - 0
evpp/EventLoopThreadPool.h

@@ -92,6 +92,7 @@ public:
     }
 
     // @param wait_threads_started: if ture this method will block until all loop_threads stopped.
+    // stop thread-safe
     void stop(bool wait_threads_stopped = false) {
         if (status() < kStarting || status() >= kStopping) return;
         setStatus(kStopping);

+ 5 - 2
evpp/TcpClient.h

@@ -80,10 +80,11 @@ public:
         channel.reset(new TSocketChannel(io));
         return connfd;
     }
+    // closesocket thread-safe
     void closesocket() {
+        enable_reconnect = false;
         if (channel) {
-            channel->close();
-            channel = NULL;
+            channel->close(true);
         }
     }
 
@@ -156,6 +157,7 @@ public:
     void start(bool wait_threads_started = true) {
         loop_thread.start(wait_threads_started, std::bind(&TcpClientTmpl::startConnect, this));
     }
+    // stop thread-safe
     void stop(bool wait_threads_stopped = true) {
         enable_reconnect = false;
         loop_thread.stop(wait_threads_stopped);
@@ -166,6 +168,7 @@ public:
         return channel->isConnected();
     }
 
+    // send thread-safe
     int send(const void* data, int size) {
         if (!isConnected()) return -1;
         return channel->write(data, size);

+ 4 - 1
evpp/TcpServer.h

@@ -34,9 +34,10 @@ public:
         listenfd = Listen(port, host);
         return listenfd;
     }
+    // closesocket thread-safe
     void closesocket() {
         if (listenfd >= 0) {
-            ::closesocket(listenfd);
+            hio_close_async(hio_get(acceptor_thread.hloop(), listenfd));
             listenfd = -1;
         }
     }
@@ -62,6 +63,7 @@ public:
         worker_threads.start(wait_threads_started);
         acceptor_thread.start(wait_threads_started, std::bind(&TcpServerTmpl::startAccept, this));
     }
+    // stop thread-safe
     void stop(bool wait_threads_stopped = true) {
         acceptor_thread.stop(wait_threads_stopped);
         worker_threads.stop(wait_threads_stopped);
@@ -120,6 +122,7 @@ public:
         return channels.size();
     }
 
+    // broadcast thread-safe
     int broadcast(const void* data, int size) {
         return foreachChannel([data, size](const TSocketChannelPtr& channel) {
             channel->write(data, size);

+ 1 - 0
evpp/TimerThread.h

@@ -19,6 +19,7 @@ public:
     }
 
 public:
+    // setTimer, setTimeout, killTimer, resetTimer thread-safe
     TimerID setTimer(int timeout_ms, TimerCallback cb, uint32_t repeat = INFINITE) {
         printf("TimerThread::setTimer\n");
         TimerID timerID = ++nextTimerID;

+ 4 - 2
evpp/UdpClient.h

@@ -33,10 +33,10 @@ public:
         channel.reset(new TSocketChannel(io));
         return channel->fd();
     }
+    // closesocket thread-safe
     void closesocket() {
         if (channel) {
-            channel->close();
-            channel = NULL;
+            channel->close(true);
         }
     }
 
@@ -63,10 +63,12 @@ public:
     void start(bool wait_threads_started = true) {
         loop_thread.start(wait_threads_started, std::bind(&UdpClientTmpl::startRecv, this));
     }
+    // stop thread-safe
     void stop(bool wait_threads_stopped = true) {
         loop_thread.stop(wait_threads_stopped);
     }
 
+    // sendto thread-safe
     int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
         if (channel == NULL) return -1;
         std::lock_guard<std::mutex> locker(sendto_mutex);

+ 4 - 2
evpp/UdpServer.h

@@ -33,10 +33,10 @@ public:
         channel.reset(new TSocketChannel(io));
         return channel->fd();
     }
+    // closesocket thread-safe
     void closesocket() {
         if (channel) {
-            channel->close();
-            channel = NULL;
+            channel->close(true);
         }
     }
 
@@ -63,10 +63,12 @@ public:
     void start(bool wait_threads_started = true) {
         loop_thread.start(wait_threads_started, std::bind(&UdpServerTmpl::startRecv, this));
     }
+    // stop thread-safe
     void stop(bool wait_threads_stopped = true) {
         loop_thread.stop(wait_threads_stopped);
     }
 
+    // sendto thread-safe
     int sendto(const void* data, int size, struct sockaddr* peeraddr = NULL) {
         if (channel == NULL) return -1;
         std::lock_guard<std::mutex> locker(sendto_mutex);