#include "LibLsp/JsonRpc/MessageIssue.h" #include "LibLsp/JsonRpc/WebSocketServer.h" #include #include #include #include "LibLsp/JsonRpc/stream.h" #include #include #include namespace beast = boost::beast; // from namespace http = beast::http; // from namespace websocket = beast::websocket; // from namespace net = boost::asio; // from using tcp = boost::asio::ip::tcp; // from namespace lsp { // Echoes back all received WebSocket messages class server_session : public std::enable_shared_from_this { websocket::stream ws_; beast::flat_buffer buffer_; std::string user_agent_; public: std::shared_ptr proxy_; // Take ownership of the socket explicit server_session(tcp::socket&& socket,const std::string& user_agent) : ws_(std::move(socket)),user_agent_(user_agent) { proxy_ = std::make_shared(ws_); } // Get on the correct executor void run() { // We need to be executing within a strand to perform async operations // on the I/O objects in this server_session. Although not strictly necessary // for single-threaded contexts, this example code is written to be // thread-safe by default. net::dispatch(ws_.get_executor(), beast::bind_front_handler( &server_session::on_run, shared_from_this())); } // Start the asynchronous operation void on_run() { // Set suggested timeout settings for the websocket ws_.set_option( websocket::stream_base::timeout::suggested( beast::role_type::server)); // Set a decorator to change the Server of the handshake ws_.set_option(websocket::stream_base::decorator( [=](websocket::response_type& res) { res.set(http::field::server, user_agent_.c_str()); })); // Accept the websocket handshake ws_.async_accept( beast::bind_front_handler( &server_session::on_accept, shared_from_this())); } void on_accept(beast::error_code ec) { if (ec) return ; // Read a message // Read a message into our buffer ws_.async_read( buffer_, beast::bind_front_handler( &server_session::on_read, shared_from_this())); } void on_read( beast::error_code ec, std::size_t bytes_transferred) { if(!ec) { char* data = reinterpret_cast(buffer_.data().data()); std::vector elements(data, data + bytes_transferred); buffer_.clear(); proxy_->on_request.EnqueueAll(std::move(elements), false); // Read a message into our buffer ws_.async_read( buffer_, beast::bind_front_handler( &server_session::on_read, shared_from_this())); return; } if (ec){ proxy_->error_message = ec.message(); } } void close() { if(ws_.is_open()) { boost::system::error_code ec; ws_.close(websocket::close_code::normal, ec); } } }; //------------------------------------------------------------------------------ struct WebSocketServer::Data { Data(const std::string& user_agent, lsp::Log& log) : acceptor_(io_context_), user_agent_(user_agent), _log(log) { } ~Data() { } /// The io_context used to perform asynchronous operations. boost::asio::io_context io_context_; std::shared_ptr work; /// Acceptor used to listen for incoming connections. boost::asio::ip::tcp::acceptor acceptor_; std::shared_ptr < server_session> _server_session; std::string user_agent_; lsp::Log& _log; }; websocket_stream_wrapper::websocket_stream_wrapper(boost::beast::websocket::stream& _w): ws_(_w), request_waiter(new MultiQueueWaiter()), on_request(request_waiter) { } bool websocket_stream_wrapper::fail() { return bad(); } bool websocket_stream_wrapper::eof() { return bad(); } bool websocket_stream_wrapper::good() { return !bad(); } websocket_stream_wrapper& websocket_stream_wrapper::read(char* str, std::streamsize count) { auto some = on_request.TryDequeueSome(static_cast(count)); memcpy(str,some.data(),some.size()); for (std::streamsize i = some.size(); i < count; ++i) { str[i] = static_cast(get()); } return *this; } int websocket_stream_wrapper::get() { return on_request.Dequeue(); } bool websocket_stream_wrapper::bad() { return !ws_.next_layer().socket().is_open(); } websocket_stream_wrapper& websocket_stream_wrapper::write(const std::string& c) { ws_.write(boost::asio::buffer(std::string(c))); return *this; } websocket_stream_wrapper& websocket_stream_wrapper::write(std::streamsize _s) { std::ostringstream temp; temp << _s; ws_.write(boost::asio::buffer(temp.str())); return *this; } websocket_stream_wrapper& websocket_stream_wrapper::flush() { return *this; } void websocket_stream_wrapper::clear() { } std::string websocket_stream_wrapper::what() { if (!error_message.empty()) return error_message; if (!ws_.next_layer().socket().is_open()) { return "Socket is not open."; } return {}; } WebSocketServer::~WebSocketServer() { delete d_ptr; } WebSocketServer::WebSocketServer(const std::string& user_agent, const std::string& address, const std::string& port, std::shared_ptr < MessageJsonHandler> json_handler, std::shared_ptr < Endpoint> localEndPoint, lsp::Log& log, uint32_t _max_workers) : point(json_handler,localEndPoint,log,lsp::JSONStreamStyle::Standard, _max_workers),d_ptr(new Data(user_agent,log)) { d_ptr->work = std::make_shared(d_ptr->io_context_); // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR). boost::asio::ip::tcp::resolver resolver(d_ptr->io_context_); boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(address, port).begin(); d_ptr->acceptor_.open(endpoint.protocol()); d_ptr->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); try { d_ptr->acceptor_.bind(endpoint); } catch (boost::system::system_error & e) { std::string temp = "Socket Server blid faild."; d_ptr->_log.log(lsp::Log::Level::INFO , temp + e.what()); return; } d_ptr->acceptor_.listen(); do_accept(); std::string desc = "Socket WebSocketServer " + address + " " + port + " start."; d_ptr->_log.log(lsp::Log::Level::INFO, desc); } void WebSocketServer::run() { // The io_context::run() call will block until all asynchronous operations // have finished. While the WebSocketServer is running, there is always at least one // asynchronous operation outstanding: the asynchronous accept call waiting // for new incoming connections. d_ptr->io_context_.run(); } void WebSocketServer::stop() { try { if(d_ptr->work) d_ptr->work.reset(); do_stop(); } catch (...) { } } void WebSocketServer::do_accept() { d_ptr->acceptor_.async_accept( [this](boost::system::error_code ec, boost::asio::ip::tcp::socket socket) { // Check whether the WebSocketServer was stopped by a signal before this // completion handler had a chance to run. if (!d_ptr->acceptor_.is_open()) { return; } if (!ec) { if(d_ptr->_server_session) { try { d_ptr->_server_session->close(); point.stop(); } catch (...) { } } d_ptr->_server_session = std::make_shared(std::move(socket), d_ptr->user_agent_); d_ptr->_server_session->run(); point.startProcessingMessages(d_ptr->_server_session->proxy_, d_ptr->_server_session->proxy_); do_accept(); } }); } void WebSocketServer::do_stop() { d_ptr->acceptor_.close(); point.stop(); } } // namespace