소스 검색

Impl WebSocketServer

ithewei 4 년 전
부모
커밋
70d228f247

+ 14 - 6
Makefile

@@ -2,7 +2,7 @@ include config.mk
 include Makefile.vars
 
 MAKEF=$(MAKE) -f Makefile.in
-ALL_SRCDIRS=. base utils event protocol http http/client http/server consul
+ALL_SRCDIRS=. base utils event evpp protocol http http/client http/server consul
 
 LIBHV_SRCDIRS = . base utils event evpp
 LIBHV_HEADERS = hv.h hconfig.h hexport.h
@@ -39,6 +39,8 @@ examples: hmain_test htimer_test hloop_test \
 	tcp_chat_server \
 	tcp_proxy_server \
 	http_server_test http_client_test \
+	websocket_server_test \
+	websocket_client_test \
 	consul_cli
 
 clean:
@@ -94,20 +96,26 @@ endif
 
 httpd: prepare
 	$(RM) examples/httpd/*.o
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/server examples/httpd"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event evpp http http/server examples/httpd"
 
 curl: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/curl.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event evpp http http/client" SRCS="examples/curl.cpp"
 	# $(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes DEFINES="CURL_STATICLIB"
 
 http_server_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/server" SRCS="examples/http_server_test.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event evpp http http/server" SRCS="examples/http_server_test.cpp"
 
 http_client_test: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/http_client_test.cpp"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event evpp http http/client" SRCS="examples/http_client_test.cpp"
+
+websocket_server_test: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event evpp http http/server" SRCS="examples/websocket_server_test.cpp"
+
+websocket_client_test: prepare
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event evpp http http/client" SRCS="examples/websocket_client_test.cpp"
 
 consul_cli: prepare
-	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client consul" SRCS="examples/consul_cli.cpp" DEFINES="PRINT_DEBUG"
+	$(MAKEF) TARGET=$@ SRCDIRS=". base utils event evpp http http/client consul" SRCS="examples/consul_cli.cpp" DEFINES="PRINT_DEBUG"
 
 unittest: prepare
 	$(CC)  -g -Wall -O0 -std=c99   -I. -Ibase            -o bin/mkdir_p           unittest/mkdir_test.c         base/hbase.c

+ 8 - 2
Makefile.vars

@@ -68,12 +68,18 @@ PROTOCOL_HEADERS =  protocol/icmp.h\
 HTTP_HEADERS =  http/httpdef.h\
 				http/http2def.h\
 				http/grpcdef.h\
+				http/wsdef.h\
 				http/http_content.h\
 				http/HttpMessage.h\
 				http/HttpParser.h\
+				http/WebSocketParser.h\
+				http/WebSocketChannel.h\
 
-HTTP_CLIENT_HEADERS = http/client/http_client.h http/client/requests.h
+HTTP_CLIENT_HEADERS =   http/client/http_client.h\
+						http/client/requests.h\
 
-HTTP_SERVER_HEADERS = http/server/HttpService.h http/server/HttpServer.h
+HTTP_SERVER_HEADERS =   http/server/HttpService.h\
+						http/server/HttpServer.h\
+						http/server/WebSocketServer.h\
 
 CONSUL_HEADERS = consul/consul.h

+ 9 - 1
cmake/vars.cmake

@@ -72,11 +72,19 @@ set(HTTP_HEADERS
     http/httpdef.h
     http/http2def.h
     http/grpcdef.h
+    http/wsdef.h
     http/http_content.h
     http/HttpMessage.h
     http/HttpParser.h
+    http/WebSocketParser.h
+    http/WebSocketChannel.h
 )
 
 set(HTTP_CLIENT_HEADERS http/client/http_client.h http/client/requests.h)
-set(HTTP_SERVER_HEADERS http/server/HttpService.h http/server/HttpServer.h)
+set(HTTP_SERVER_HEADERS
+    http/server/HttpService.h
+    http/server/HttpServer.h
+    http/server/WebSocketServer.h
+)
+
 set(CONSUL_HEADERS consul/consul.h)

+ 1 - 1
event/hloop.h

@@ -99,7 +99,7 @@ typedef enum {
 #define HIO_DEFAULT_CONNECT_TIMEOUT     5000    // ms
 #define HIO_DEFAULT_CLOSE_TIMEOUT       60000   // ms
 #define HIO_DEFAULT_KEEPALIVE_TIMEOUT   75000   // ms
-#define HIO_DEFAULT_HEARTBEAT_INTERVAL  3000    // ms
+#define HIO_DEFAULT_HEARTBEAT_INTERVAL  30000   // ms
 
 BEGIN_EXTERN_C
 

+ 11 - 3
examples/CMakeLists.txt

@@ -10,7 +10,7 @@ list(APPEND EXAMPLES
     tcp_proxy_server
 )
 
-include_directories(.. ../base ../event ../utils)
+include_directories(.. ../base ../utils ../event ../evpp)
 
 add_executable(hmain_test hmain_test.cpp)
 target_link_libraries(hmain_test hv)
@@ -54,7 +54,11 @@ if(WITH_HTTP_SERVER)
     add_executable(http_server_test http_server_test.cpp)
     target_link_libraries(http_server_test hv)
 
-    list(APPEND EXAMPLES httpd http_server_test)
+    # websocket_server_test
+    add_executable(websocket_server_test websocket_server_test.cpp)
+    target_link_libraries(websocket_server_test hv)
+
+    list(APPEND EXAMPLES httpd http_server_test websocket_server_test)
 endif()
 
 if(WITH_HTTP_CLIENT)
@@ -75,7 +79,11 @@ if(WITH_HTTP_CLIENT)
     add_executable(http_client_test http_client_test.cpp)
     target_link_libraries(http_client_test hv)
 
-    list(APPEND EXAMPLES ${CURL_TARGET_NAME} http_client_test)
+    # websocket_client_test
+    add_executable(websocket_client_test websocket_client_test.cpp)
+    target_link_libraries(websocket_client_test hv)
+
+    list(APPEND EXAMPLES ${CURL_TARGET_NAME} http_client_test websocket_client_test)
 endif()
 
 if(WITH_CONSUL)

+ 11 - 0
examples/websocket_client_test.cpp

@@ -0,0 +1,11 @@
+/*
+ * TODO
+ * @see html/websocket_client.html
+ *
+ */
+
+// #include "WebSocketClient.h"
+
+int main() {
+    return 0;
+}

+ 35 - 0
examples/websocket_server_test.cpp

@@ -0,0 +1,35 @@
+#include "WebSocketServer.h"
+#include "EventLoop.h"
+#include "htime.h"
+
+using namespace hv;
+
+int main() {
+    WebSocketServerCallbacks ws;
+    ws.onopen = [](const WebSocketChannelPtr& channel, const std::string& url) {
+        printf("onopen: GET %s\n", url.c_str());
+        // send(time) every 1s
+        setInterval(1000, [channel](TimerID id) {
+            if (channel->isConnected()) {
+                char str[DATETIME_FMT_BUFLEN] = {0};
+                datetime_t dt = datetime_now();
+                datetime_fmt(&dt, str);
+                channel->send(str);
+            } else {
+                killTimer(id);
+            }
+        });
+    };
+    ws.onmessage = [](const WebSocketChannelPtr& channel, const std::string& msg) {
+        printf("onmessage: %s\n", msg.c_str());
+    };
+    ws.onclose = [](const WebSocketChannelPtr& channel) {
+        printf("onclose\n");
+    };
+
+    websocket_server_t server;
+    server.port = 8888;
+    server.ws = &ws;
+    websocket_server_run(&server);
+    return 0;
+}

+ 46 - 0
http/WebSocketChannel.h

@@ -0,0 +1,46 @@
+#ifndef HV_WEBSOCKET_CHANNEL_H_
+#define HV_WEBSOCKET_CHANNEL_H_
+
+#include "Channel.h"
+
+#include "wsdef.h"
+#include "hmath.h"
+
+namespace hv {
+
+class WebSocketChannel : public SocketChannel {
+public:
+    ws_session_type type;
+    WebSocketChannel(hio_t* io, ws_session_type type = WS_CLIENT)
+        : SocketChannel(io)
+        , type(type)
+    {}
+    ~WebSocketChannel() {}
+
+    // IsConnected, send, close
+
+    int send(const std::string& msg) {
+        bool has_mask = false;
+        char mask[4] = {0};
+        if (type == WS_CLIENT) {
+            has_mask = true;
+            *(int*)mask = rand();
+        }
+        int frame_size = ws_calc_frame_size(msg.size(), has_mask);
+        if (sendbuf.len < frame_size) {
+            sendbuf.resize(ceil2e(frame_size));
+        }
+        ws_build_frame(sendbuf.base, msg.c_str(), msg.size(), mask, has_mask);
+        Buffer buf(sendbuf.base, frame_size);
+        return write(&buf);
+    }
+
+private:
+    Buffer sendbuf;
+};
+
+}
+
+typedef std::shared_ptr<hv::WebSocketChannel> WebSocketChannelPtr;
+
+#endif // HV_WEBSOCKET_CHANNEL_H_

+ 72 - 0
http/WebSocketParser.cpp

@@ -0,0 +1,72 @@
+#include "WebSocketParser.h"
+
+#include "websocket_parser.h"
+
+static int on_frame_header(websocket_parser* parser) {
+    WebSocketParser* wp = (WebSocketParser*)parser->data;
+    int opcode = parser->flags & WS_OP_MASK;
+    // printf("on_frame_header opcode=%d\n", opcode);
+    if (opcode != WS_OP_CONTINUE) {
+        wp->opcode = opcode;
+    }
+    int length = parser->length;
+    if (length && length > wp->message.capacity()) {
+        wp->message.reserve(length);
+    }
+    if (wp->state == WS_FRAME_BEGIN ||
+        wp->state == WS_FRAME_END) {
+        wp->message.resize(0);
+    }
+    wp->state = WS_FRAME_HEADER;
+    return 0;
+}
+
+static int on_frame_body(websocket_parser* parser, const char * at, size_t length) {
+    // printf("on_frame_body length=%d\n", (int)length);
+    WebSocketParser* wp = (WebSocketParser*)parser->data;
+    wp->state = WS_FRAME_BODY;
+    if (wp->parser->flags & WS_HAS_MASK) {
+        websocket_parser_decode((char*)at, at, length, wp->parser);
+    }
+    wp->message.append(at, length);
+    return 0;
+}
+
+static int on_frame_end(websocket_parser* parser) {
+    // printf("on_frame_end\n");
+    WebSocketParser* wp = (WebSocketParser*)parser->data;
+    wp->state = WS_FRAME_END;
+    if (wp->parser->flags & WS_FIN) {
+        if (wp->onMessage) {
+            wp->onMessage(wp->opcode, wp->message);
+        }
+    }
+    return 0;
+}
+
+websocket_parser_settings* WebSocketParser::cbs = NULL;
+
+WebSocketParser::WebSocketParser() {
+    if (cbs == NULL) {
+        cbs = (websocket_parser_settings*)malloc(sizeof(websocket_parser_settings));
+        websocket_parser_settings_init(cbs);
+        cbs->on_frame_header = on_frame_header;
+        cbs->on_frame_body = on_frame_body;
+        cbs->on_frame_end = on_frame_end;
+    }
+    parser = (websocket_parser*)malloc(sizeof(websocket_parser));
+    websocket_parser_init(parser);
+    parser->data = this;
+    state = WS_FRAME_BEGIN;
+}
+
+WebSocketParser::~WebSocketParser() {
+    if (parser) {
+        free(parser);
+        parser = NULL;
+    }
+}
+
+int WebSocketParser::FeedRecvData(const char* data, size_t len) {
+    return websocket_parser_execute(parser, cbs, data, len);
+}

+ 33 - 0
http/WebSocketParser.h

@@ -0,0 +1,33 @@
+#ifndef HV_WEBSOCKET_PARSER_H_
+#define HV_WEBSOCKET_PARSER_H_
+
+#include <memory>
+#include <functional>
+
+enum websocket_parser_state {
+    WS_FRAME_BEGIN,
+    WS_FRAME_HEADER,
+    WS_FRAME_BODY,
+    WS_FRAME_END,
+};
+
+struct websocket_parser_settings;
+struct websocket_parser;
+class WebSocketParser {
+public:
+    static websocket_parser_settings*   cbs;
+    websocket_parser*                   parser;
+    websocket_parser_state              state;
+    int                                 opcode;
+    std::string                         message;
+    std::function<void(int opcode, const std::string& msg)> onMessage;
+
+    WebSocketParser();
+    ~WebSocketParser();
+
+    int FeedRecvData(const char* data, size_t len);
+};
+
+typedef std::shared_ptr<WebSocketParser> WebSocketParserPtr;
+
+#endif // HV_WEBSOCKET_PARSER_H_

+ 1 - 1
http/client/AsyncHttpClient.cpp

@@ -56,7 +56,7 @@ int AsyncHttpClient::doTask(const HttpClientTaskPtr& task) {
     assert(channel != NULL);
     HttpClientContext* ctx = channel->getContext<HttpClientContext>();
     ctx->task = task;
-    channel->onconnect = [this, &channel]() {
+    channel->onconnect = [&channel]() {
         sendRequest(channel);
     };
     channel->onread = [this, &channel](Buffer* buf) {

+ 78 - 15
http/server/HttpHandler.h

@@ -5,48 +5,111 @@
 #include "HttpParser.h"
 #include "FileCache.h"
 
+#include "WebSocketServer.h"
+#include "WebSocketParser.h"
+
+#include "hlog.h"
+
+class WebSocketHandler {
+public:
+    WebSocketChannelPtr         channel;
+    WebSocketParserPtr          parser;
+    uint64_t                    last_send_ping_time;
+    uint64_t                    last_recv_pong_time;
+
+    WebSocketHandler() {
+        parser.reset(new WebSocketParser);
+        // channel.reset(new WebSocketChannel);
+        last_send_ping_time = 0;
+        last_recv_pong_time = 0;
+    }
+
+    void onopen() {
+        channel->status = hv::SocketChannel::CONNECTED;
+        /*
+        channel->onread = [this](hv::Buffer* buf) {
+            const char* data = (const char*)buf->data();
+            int size= buf->size();
+            int nfeed = parser->FeedRecvData(data, size);
+            if (nfeed != size) {
+                hloge("websocket parse error!");
+                channel->close();
+            }
+        };
+        */
+    }
+
+    void onclose() {
+        channel->status = hv::SocketChannel::DISCONNECTED;
+    }
+};
+typedef std::shared_ptr<WebSocketHandler> WebSocketHandlerPtr;
+
 class HttpHandler {
 public:
-    enum ProtoType {
+    enum ProtocolType {
         UNKNOWN,
         HTTP_V1,
         HTTP_V2,
         WEBSOCKET,
-    } proto;
+    } protocol;
 
     // peeraddr
     char                    ip[64];
     int                     port;
-    // for handle_request
-    HttpService*            service;
-    FileCache*              files;
-    file_cache_t*           fc;
+
+    // for http
+    HttpService             *service;
+    FileCache               *files;
+    file_cache_t            *fc;
 
     HttpRequest             req;
     HttpResponse            res;
     HttpParserPtr           parser;
 
+    // for websocket
+    WebSocketHandlerPtr         ws;
+    WebSocketServerCallbacks*   ws_cbs;
+
     HttpHandler() {
-        proto = UNKNOWN;
+        protocol = UNKNOWN;
         service = NULL;
         files = NULL;
         fc = NULL;
+        ws_cbs = NULL;
     }
 
-    ~HttpHandler() {
+    void Reset() {
+        req.Reset();
+        res.Reset();
+        fc = NULL;
     }
 
     // @workflow: preprocessor -> api -> web -> postprocessor
     // @result: HttpRequest -> HttpResponse/file_cache_t
     int HandleHttpRequest();
 
-    // TODO
-    // int HandleWebsocketMessage(void* buf, int len);
-
-    void Reset() {
-        req.Reset();
-        res.Reset();
-        fc = NULL;
+    // websocket
+    WebSocketHandler* SwitchWebSocket() {
+        ws.reset(new WebSocketHandler);
+        return ws.get();
+    }
+    void WebSocketOnOpen() {
+        ws->onopen();
+        if (ws_cbs && ws_cbs->onopen) {
+            ws_cbs->onopen(ws->channel, req.url);
+        }
+    }
+    void WebSocketOnClose() {
+        ws->onclose();
+        if (ws_cbs && ws_cbs->onclose) {
+            ws_cbs->onclose(ws->channel);
+        }
+    }
+    void WebSocketOnMessage(const std::string& msg) {
+        if (ws_cbs && ws_cbs->onmessage) {
+            ws_cbs->onmessage(ws->channel, msg);
+        }
     }
 };
 

+ 150 - 54
http/server/HttpServer.cpp

@@ -2,34 +2,90 @@
 
 #include "hv.h"
 #include "hmain.h"
-#include "hloop.h"
 
 #include "httpdef.h"
 #include "http2def.h"
 #include "wsdef.h"
 
+#include "EventLoop.h"
+using namespace hv;
+
 #include "HttpHandler.h"
 
 #define MIN_HTTP_REQUEST        "GET / HTTP/1.1\r\n\r\n"
 #define MIN_HTTP_REQUEST_LEN    14 // exclude CRLF
 
-static HttpService  s_default_service;
-static FileCache    s_filecache;
+static void on_accept(hio_t* io);
+static void on_recv(hio_t* io, void* _buf, int readbytes);
+static void on_close(hio_t* io);
+
+static HttpService* default_http_service() {
+    static HttpService* s_default_service = new HttpService;
+    return s_default_service;
+}
+
+static FileCache* default_filecache() {
+    static FileCache s_filecache;
+    return &s_filecache;
+}
 
 struct HttpServerPrivdata {
-    int                     quit;
     std::vector<hloop_t*>   loops;
     std::vector<hthread_t>  threads;
     std::mutex              mutex_;
 };
 
+static void websocket_heartbeat(hio_t* io) {
+    HttpHandler* handler = (HttpHandler*)hevent_userdata(io);
+    WebSocketHandler* ws = handler->ws.get();
+    if (ws->last_recv_pong_time < ws->last_send_ping_time) {
+        hlogw("[%s:%d] websocket no pong!", handler->ip, handler->port);
+        hio_close(io);
+    } else {
+        // printf("send ping\n");
+        hio_write(io, WS_SERVER_PING_FRAME, WS_SERVER_MIN_FRAME_SIZE);
+        ws->last_send_ping_time = gethrtime_us();
+    }
+}
+
+static void websocket_onmessage(int opcode, const std::string& msg, hio_t* io) {
+    HttpHandler* handler = (HttpHandler*)hevent_userdata(io);
+    WebSocketHandler* ws = handler->ws.get();
+    switch(opcode) {
+    case WS_OPCODE_CLOSE:
+        hio_close(io);
+        break;
+    case WS_OPCODE_PING:
+        // printf("recv ping\n");
+        // printf("send pong\n");
+        hio_write(io, WS_SERVER_PONG_FRAME, WS_SERVER_MIN_FRAME_SIZE);
+        break;
+    case WS_OPCODE_PONG:
+        // printf("recv pong\n");
+        ws->last_recv_pong_time = gethrtime_us();
+        break;
+    case WS_OPCODE_TEXT:
+    case WS_OPCODE_BINARY:
+        // onmessage
+        handler->WebSocketOnMessage(msg);
+        break;
+    default:
+        break;
+    }
+}
+
 static void on_recv(hio_t* io, void* _buf, int readbytes) {
     // printf("on_recv fd=%d readbytes=%d\n", hio_fd(io), readbytes);
     const char* buf = (const char*)_buf;
     HttpHandler* handler = (HttpHandler*)hevent_userdata(io);
 
-    if (handler->proto == HttpHandler::WEBSOCKET) {
-        // TODO: HandleWebsocketMessage
+    if (handler->protocol == HttpHandler::WEBSOCKET) {
+        WebSocketParser* parser = handler->ws->parser.get();
+        int nfeed = parser->FeedRecvData(buf, readbytes);
+        if (nfeed != readbytes) {
+            hloge("[%s:%d] websocket parse error!", handler->ip, handler->port);
+            hio_close(io);
+        }
         return;
     }
 
@@ -52,10 +108,10 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
             }
         }
         http_version version = HTTP_V1;
-        handler->proto = HttpHandler::HTTP_V1;
+        handler->protocol = HttpHandler::HTTP_V1;
         if (strncmp((char*)buf, HTTP2_MAGIC, MIN(readbytes, HTTP2_MAGIC_LEN)) == 0) {
             version = HTTP_V2;
-            handler->proto = HttpHandler::HTTP_V2;
+            handler->protocol = HttpHandler::HTTP_V2;
             handler->req.http_major = 2;
             handler->req.http_minor = 0;
         }
@@ -138,7 +194,7 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
                 ws_encode_key(iter_key->second.c_str(), ws_accept);
                 res->headers[SEC_WEBSOCKET_ACCEPT] = ws_accept;
             }
-            handler->proto = HttpHandler::WEBSOCKET;
+            handler->protocol = HttpHandler::WEBSOCKET;
         }
         // h2/h2c
         else if (strnicmp(upgrade_proto, "h2", 2) == 0) {
@@ -210,6 +266,22 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
         http_method_str(req->method), req->path.c_str(),
         res->status_code, res->status_message());
 
+    // switch protocol to websocket
+    if (upgrade && handler->protocol == HttpHandler::WEBSOCKET) {
+        WebSocketHandler* ws = handler->SwitchWebSocket();
+        ws->channel.reset(new WebSocketChannel(io, WS_SERVER));
+        ws->parser->onMessage = std::bind(websocket_onmessage, std::placeholders::_1, std::placeholders::_2, io);
+        // NOTE: need to reset callbacks
+        hio_setcb_read(io, on_recv);
+        hio_setcb_close(io, on_close);
+        // NOTE: cancel keepalive timer, judge alive by heartbeat.
+        hio_set_keepalive_timeout(io, 0);
+        hio_set_heartbeat(io, HIO_DEFAULT_HEARTBEAT_INTERVAL, websocket_heartbeat);
+        // onopen
+        handler->WebSocketOnOpen();
+        return;
+    }
+
     if (keepalive) {
         handler->Reset();
         parser->InitRequest(req);
@@ -221,14 +293,18 @@ static void on_recv(hio_t* io, void* _buf, int readbytes) {
 static void on_close(hio_t* io) {
     HttpHandler* handler = (HttpHandler*)hevent_userdata(io);
     if (handler) {
+        if (handler->protocol == HttpHandler::WEBSOCKET) {
+            // onclose
+            handler->WebSocketOnClose();
+        }
         hevent_set_userdata(io, NULL);
         delete handler;
     }
 }
 
 static void on_accept(hio_t* io) {
-    printd("on_accept connfd=%d\n", hio_fd(io));
     /*
+    printf("on_accept connfd=%d\n", hio_fd(io));
     char localaddrstr[SOCKADDR_STRLEN] = {0};
     char peeraddrstr[SOCKADDR_STRLEN] = {0};
     printf("accept connfd=%d [%s] <= [%s]\n", hio_fd(io),
@@ -240,25 +316,26 @@ static void on_accept(hio_t* io) {
     hio_setcb_read(io, on_recv);
     hio_read(io);
     hio_set_keepalive_timeout(io, HIO_DEFAULT_KEEPALIVE_TIMEOUT);
-    // new HttpHandler
-    // delete on_close
+    // new HttpHandler, delete on_close
     HttpHandler* handler = new HttpHandler;
-    handler->service = (HttpService*)hevent_userdata(io);
-    handler->files = &s_filecache;
+    // ip
     sockaddr_ip((sockaddr_u*)hio_peeraddr(io), handler->ip, sizeof(handler->ip));
+    // port
     handler->port = sockaddr_port((sockaddr_u*)hio_peeraddr(io));
+    // service
+    http_server_t* server = (http_server_t*)hevent_userdata(io);
+    handler->service = server->service;
+    // ws
+    handler->ws_cbs = server->ws;
+    // FileCache
+    handler->files = default_filecache();
     hevent_set_userdata(io, handler);
 }
 
 static void handle_cached_files(htimer_t* timer) {
     FileCache* pfc = (FileCache*)hevent_userdata(timer);
-    if (pfc == NULL) {
-        htimer_del(timer);
-        return;
-    }
     file_cache_t* fc = NULL;
-    time_t tt;
-    time(&tt);
+    time_t tt = time(NULL);
     std::lock_guard<std::mutex> locker(pfc->mutex_);
     auto iter = pfc->cached_files.begin();
     while (iter != pfc->cached_files.end()) {
@@ -272,88 +349,107 @@ static void handle_cached_files(htimer_t* timer) {
     }
 }
 
-static void fsync_logfile(hidle_t* idle) {
-    hlog_fsync();
-}
-
-static void worker_fn(void* userdata) {
+static void loop_thread(void* userdata) {
     http_server_t* server = (http_server_t*)userdata;
     int listenfd = server->listenfd;
-    hloop_t* loop = hloop_new(0);
-    hio_t* listenio = haccept(loop, listenfd, on_accept);
-    hevent_set_userdata(listenio, server->service);
+
+    EventLoopPtr loop(new EventLoop);
+    hloop_t* hloop = loop->loop();
+    hio_t* listenio = haccept(hloop, listenfd, on_accept);
+    hevent_set_userdata(listenio, server);
     if (server->ssl) {
         hio_enable_ssl(listenio);
     }
     // fsync logfile when idle
     hlog_disable_fsync();
-    hidle_add(loop, fsync_logfile, INFINITE);
+    hidle_add(hloop, [](hidle_t*) {
+        hlog_fsync();
+    }, INFINITE);
     // timer handle_cached_files
-    htimer_t* timer = htimer_add(loop, handle_cached_files, s_filecache.file_cached_time * 1000);
-    hevent_set_userdata(timer, &s_filecache);
+    FileCache* filecache = default_filecache();
+    htimer_t* timer = htimer_add(hloop, handle_cached_files, filecache->file_cached_time * 1000);
+    hevent_set_userdata(timer, filecache);
 
-    // for SDK implement http_server_stop
     HttpServerPrivdata* privdata = (HttpServerPrivdata*)server->privdata;
     if (privdata) {
-        std::lock_guard<std::mutex> locker(privdata->mutex_);
-        if (privdata->quit) {
-            hloop_free(&loop);
-            return;
-        }
-        privdata->loops.push_back(loop);
+        privdata->mutex_.lock();
+        privdata->loops.push_back(hloop);
+        privdata->mutex_.unlock();
     }
 
-    hloop_run(loop);
-    hloop_free(&loop);
+    loop->run();
 }
 
 int http_server_run(http_server_t* server, int wait) {
-    // service
-    if (server->service == NULL) {
-        server->service = &s_default_service;
-    }
     // port
     server->listenfd = Listen(server->port, server->host);
     if (server->listenfd < 0) return server->listenfd;
+    // service
+    if (server->service == NULL) {
+        server->service = default_http_service();
+    }
 
     if (server->worker_processes) {
         // multi-processes
-        return master_workers_run(worker_fn, server, server->worker_processes, server->worker_threads, wait);
+        return master_workers_run(loop_thread, server, server->worker_processes, server->worker_threads, wait);
     }
     else {
         // multi-threads
-        int worker_threads = server->worker_threads;
-        if (worker_threads == 0) worker_threads = 1;
+        if (server->worker_threads == 0) server->worker_threads = 1;
 
         // for SDK implement http_server_stop
         HttpServerPrivdata* privdata = new HttpServerPrivdata;
-        privdata->quit = 0;
         server->privdata = privdata;
 
         int i = wait ? 1 : 0;
-        for (; i < worker_threads; ++i) {
-            hthread_t thrd = hthread_create((hthread_routine)worker_fn, server);
+        for (; i < server->worker_threads; ++i) {
+            hthread_t thrd = hthread_create((hthread_routine)loop_thread, server);
             privdata->threads.push_back(thrd);
         }
         if (wait) {
-            worker_fn(server);
+            loop_thread(server);
         }
         return 0;
     }
 }
 
 int http_server_stop(http_server_t* server) {
+#ifdef OS_UNIX
+    if (server->worker_processes) {
+        signal_handle("stop");
+        return 0;
+    }
+#endif
+
     HttpServerPrivdata* privdata = (HttpServerPrivdata*)server->privdata;
     if (privdata == NULL) return 0;
 
+    // wait for all threads started and all loops running
+    while (1) {
+        hv_delay(1);
+        std::lock_guard<std::mutex> locker(privdata->mutex_);
+        // wait for all loops created
+        if (privdata->loops.size() < server->worker_threads) {
+            continue;
+        }
+        // wait for all loops running
+        for (auto& loop : privdata->loops) {
+            if (hloop_status(loop) == HLOOP_STATUS_STOP) {
+                continue;
+            }
+        }
+        break;
+    }
+
+    // stop all loops
     privdata->mutex_.lock();
-    privdata->quit = 1;
-    for (auto loop : privdata->loops) {
+    for (auto& loop : privdata->loops) {
         hloop_stop(loop);
     }
     privdata->mutex_.unlock();
 
-    for (auto thrd : privdata->threads) {
+    // join all threads
+    for (auto& thrd : privdata->threads) {
         hthread_join(thrd);
     }
 

+ 6 - 3
http/server/HttpServer.h

@@ -4,14 +4,16 @@
 #include "hexport.h"
 #include "HttpService.h"
 
+struct WebSocketServerCallbacks;
 typedef struct http_server_s {
     char host[64];
     int port;
     int ssl;
     int http_version;
-    HttpService* service;
     int worker_processes;
     int worker_threads;
+    HttpService* service;
+    WebSocketServerCallbacks* ws;
     void* userdata;
 //private:
     int listenfd;
@@ -23,9 +25,10 @@ typedef struct http_server_s {
         port = DEFAULT_HTTP_PORT;
         ssl = 0;
         http_version = 1;
-        service = NULL;
         worker_processes = 0;
         worker_threads = 0;
+        service = NULL;
+        ws = NULL;
         listenfd = -1;
         userdata = NULL;
         privdata = NULL;
@@ -54,7 +57,7 @@ int main() {
 */
 HV_EXPORT int http_server_run(http_server_t* server, int wait = 1);
 
-// just for worker_processes = 0
+// NOTE: stop all loops and join all threads
 HV_EXPORT int http_server_stop(http_server_t* server);
 
 #endif

+ 17 - 0
http/server/WebSocketServer.h

@@ -0,0 +1,17 @@
+#ifndef HV_WEBSOCKET_SERVER_H_
+#define HV_WEBSOCKET_SERVER_H_
+
+#include "HttpServer.h"
+#include "WebSocketChannel.h"
+
+struct WebSocketServerCallbacks {
+    std::function<void(const WebSocketChannelPtr&, const std::string&)> onopen;
+    std::function<void(const WebSocketChannelPtr&, const std::string&)> onmessage;
+    std::function<void(const WebSocketChannelPtr&)>                     onclose;
+};
+
+#define websocket_server_t      http_server_t
+#define websocket_server_run    http_server_run
+#define websocket_server_stop   http_server_stop
+
+#endif // HV_WEBSOCKET_SERVER_H_

+ 17 - 0
http/wsdef.h

@@ -10,6 +10,23 @@
 #define SEC_WEBSOCKET_KEY       "Sec-WebSocket-Key"
 #define SEC_WEBSOCKET_ACCEPT    "Sec-WebSocket-Accept"
 
+#define WS_SERVER_MIN_FRAME_SIZE    2
+// 1000 1001 0000 0000
+#define WS_SERVER_PING_FRAME        "\211\0"
+// 1000 1010 0000 0000
+#define WS_SERVER_PONG_FRAME        "\212\0"
+
+#define WS_CLIENT_MIN_FRAME_SIZE    6
+// 1000 1001 1000 0000
+#define WS_CLIENT_PING_FRAME        "\211\200WSWS"
+// 1000 1010 1000 0000
+#define WS_CLIENT_PONG_FRAME        "\212\200WSWS"
+
+enum ws_session_type {
+    WS_CLIENT,
+    WS_SERVER,
+};
+
 enum ws_opcode {
     WS_OPCODE_CONTINUE = 0x0,
     WS_OPCODE_TEXT     = 0x1,