Skip to content

Commit 1c82d37

Browse files
fix: Add queue size limit for websocket (#1701)
For slow clients, we will disconnect with it if the message queue is too long. --------- Co-authored-by: Sergey Kuznetsov <[email protected]>
1 parent f083c82 commit 1c82d37

File tree

8 files changed

+132
-96
lines changed

8 files changed

+132
-96
lines changed

docs/examples/config/example-config.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@
7575
// For sequent policy request from one client connection will be processed one by one and the next one will not be read before
7676
// the previous one is processed. For parallel policy Clio will take all requests and process them in parallel and
7777
// send a reply for each request whenever it is ready.
78-
"parallel_requests_limit": 10 // Optional parameter, used only if "processing_strategy" is "parallel".
79-
It limits the number of requests for one client connection processed in parallel. Infinite if not specified.
80-
78+
"parallel_requests_limit": 10, // Optional parameter, used only if "processing_strategy" is "parallel". It limits the number of requests for one client connection processed in parallel. Infinite if not specified.
79+
// Max number of responses to queue up before sent successfully. If a client's waiting queue is too long, the server will close the connection.
80+
"ws_max_sending_queue_size": 1500
8181
},
8282
// Time in seconds for graceful shutdown. Defaults to 10 seconds. Not fully implemented yet.
8383
"graceful_period": 10.0,

src/web/HttpSession.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <boost/beast/core/flat_buffer.hpp>
3131
#include <boost/beast/core/tcp_stream.hpp>
3232

33+
#include <cstdint>
3334
#include <functional>
3435
#include <memory>
3536
#include <string>
@@ -52,6 +53,7 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
5253
public std::enable_shared_from_this<HttpSession<HandlerType>> {
5354
boost::beast::tcp_stream stream_;
5455
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
56+
std::uint32_t maxWsSendingQueueSize_;
5557

5658
public:
5759
/**
@@ -64,6 +66,7 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
6466
* @param dosGuard The denial of service guard to use
6567
* @param handler The server handler to use
6668
* @param buffer Buffer with initial data received from the peer
69+
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
6770
*/
6871
explicit HttpSession(
6972
tcp::socket&& socket,
@@ -72,7 +75,8 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
7275
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
7376
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
7477
std::shared_ptr<HandlerType> const& handler,
75-
boost::beast::flat_buffer buffer
78+
boost::beast::flat_buffer buffer,
79+
std::uint32_t maxWsSendingQueueSize
7680
)
7781
: impl::HttpBase<HttpSession, HandlerType>(
7882
ip,
@@ -84,6 +88,7 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
8488
)
8589
, stream_(std::move(socket))
8690
, tagFactory_(tagFactory)
91+
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
8792
{
8893
}
8994

@@ -128,7 +133,8 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
128133
this->handler_,
129134
std::move(this->buffer_),
130135
std::move(this->req_),
131-
ConnectionBase::isAdmin()
136+
ConnectionBase::isAdmin(),
137+
maxWsSendingQueueSize_
132138
)
133139
->run();
134140
}

src/web/PlainWsSession.hpp

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ class PlainWsSession : public impl::WsBase<PlainWsSession, HandlerType> {
6262
* @param dosGuard The denial of service guard to use
6363
* @param handler The server handler to use
6464
* @param buffer Buffer with initial data received from the peer
65-
* @param isAdmin Whether the connection has admin privileges
65+
* @param isAdmin Whether the connection has admin privileges,
66+
* @param maxSendingQueueSize The maximum size of the sending queue for websocket
6667
*/
6768
explicit PlainWsSession(
6869
boost::asio::ip::tcp::socket&& socket,
@@ -71,9 +72,17 @@ class PlainWsSession : public impl::WsBase<PlainWsSession, HandlerType> {
7172
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
7273
std::shared_ptr<HandlerType> const& handler,
7374
boost::beast::flat_buffer&& buffer,
74-
bool isAdmin
75+
bool isAdmin,
76+
std::uint32_t maxSendingQueueSize
7577
)
76-
: impl::WsBase<PlainWsSession, HandlerType>(ip, tagFactory, dosGuard, handler, std::move(buffer))
78+
: impl::WsBase<PlainWsSession, HandlerType>(
79+
ip,
80+
tagFactory,
81+
dosGuard,
82+
handler,
83+
std::move(buffer),
84+
maxSendingQueueSize
85+
)
7786
, ws_(std::move(socket))
7887
{
7988
ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer)
@@ -107,6 +116,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
107116
std::string ip_;
108117
std::shared_ptr<HandlerType> const handler_;
109118
bool isAdmin_;
119+
std::uint32_t maxWsSendingQueueSize_;
110120

111121
public:
112122
/**
@@ -120,6 +130,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
120130
* @param buffer Buffer with initial data received from the peer. Ownership is transferred
121131
* @param request The request. Ownership is transferred
122132
* @param isAdmin Whether the connection has admin privileges
133+
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
123134
*/
124135
WsUpgrader(
125136
boost::beast::tcp_stream&& stream,
@@ -129,7 +140,8 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
129140
std::shared_ptr<HandlerType> const& handler,
130141
boost::beast::flat_buffer&& buffer,
131142
http::request<http::string_body> request,
132-
bool isAdmin
143+
bool isAdmin,
144+
std::uint32_t maxWsSendingQueueSize
133145
)
134146
: http_(std::move(stream))
135147
, buffer_(std::move(buffer))
@@ -139,6 +151,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
139151
, ip_(std::move(ip))
140152
, handler_(handler)
141153
, isAdmin_(isAdmin)
154+
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
142155
{
143156
}
144157

@@ -175,7 +188,14 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
175188
boost::beast::get_lowest_layer(http_).expires_never();
176189

177190
std::make_shared<PlainWsSession<HandlerType>>(
178-
http_.release_socket(), ip_, tagFactory_, dosGuard_, handler_, std::move(buffer_), isAdmin_
191+
http_.release_socket(),
192+
ip_,
193+
tagFactory_,
194+
dosGuard_,
195+
handler_,
196+
std::move(buffer_),
197+
isAdmin_,
198+
maxWsSendingQueueSize_
179199
)
180200
->run(std::move(req_));
181201
}

src/web/Server.hpp

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <fmt/core.h>
4242

4343
#include <chrono>
44+
#include <cstdint>
4445
#include <exception>
4546
#include <functional>
4647
#include <memory>
@@ -84,6 +85,7 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSessionType,
8485
std::shared_ptr<HandlerType> const handler_;
8586
boost::beast::flat_buffer buffer_;
8687
std::shared_ptr<impl::AdminVerificationStrategy> const adminVerification_;
88+
std::uint32_t maxWsSendingQueueSize_;
8789

8890
public:
8991
/**
@@ -95,21 +97,24 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSessionType,
9597
* @param dosGuard The denial of service guard to use
9698
* @param handler The server handler to use
9799
* @param adminVerification The admin verification strategy to use
100+
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
98101
*/
99102
Detector(
100103
tcp::socket&& socket,
101104
std::optional<std::reference_wrapper<boost::asio::ssl::context>> ctx,
102105
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
103106
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
104107
std::shared_ptr<HandlerType> handler,
105-
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification
108+
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification,
109+
std::uint32_t maxWsSendingQueueSize
106110
)
107111
: stream_(std::move(socket))
108112
, ctx_(ctx)
109113
, tagFactory_(std::cref(tagFactory))
110114
, dosGuard_(dosGuard)
111115
, handler_(std::move(handler))
112116
, adminVerification_(std::move(adminVerification))
117+
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
113118
{
114119
}
115120

@@ -167,14 +172,22 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSessionType,
167172
tagFactory_,
168173
dosGuard_,
169174
handler_,
170-
std::move(buffer_)
175+
std::move(buffer_),
176+
maxWsSendingQueueSize_
171177
)
172178
->run();
173179
return;
174180
}
175181

176182
std::make_shared<PlainSessionType<HandlerType>>(
177-
stream_.release_socket(), ip, adminVerification_, tagFactory_, dosGuard_, handler_, std::move(buffer_)
183+
stream_.release_socket(),
184+
ip,
185+
adminVerification_,
186+
tagFactory_,
187+
dosGuard_,
188+
handler_,
189+
std::move(buffer_),
190+
maxWsSendingQueueSize_
178191
)
179192
->run();
180193
}
@@ -204,6 +217,7 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
204217
std::shared_ptr<HandlerType> handler_;
205218
tcp::acceptor acceptor_;
206219
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification_;
220+
std::uint32_t maxWsSendingQueueSize_;
207221

208222
public:
209223
/**
@@ -216,6 +230,7 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
216230
* @param dosGuard The denial of service guard to use
217231
* @param handler The server handler to use
218232
* @param adminPassword The optional password to verify admin role in requests
233+
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
219234
*/
220235
Server(
221236
boost::asio::io_context& ioc,
@@ -224,7 +239,8 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
224239
util::TagDecoratorFactory tagFactory,
225240
dosguard::DOSGuardInterface& dosGuard,
226241
std::shared_ptr<HandlerType> handler,
227-
std::optional<std::string> adminPassword
242+
std::optional<std::string> adminPassword,
243+
std::uint32_t maxWsSendingQueueSize
228244
)
229245
: ioc_(std::ref(ioc))
230246
, ctx_(std::move(ctx))
@@ -233,6 +249,7 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
233249
, handler_(std::move(handler))
234250
, acceptor_(boost::asio::make_strand(ioc))
235251
, adminVerification_(impl::make_AdminVerificationStrategy(std::move(adminPassword)))
252+
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
236253
{
237254
boost::beast::error_code ec;
238255

@@ -286,7 +303,13 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
286303
ctx_ ? std::optional<std::reference_wrapper<boost::asio::ssl::context>>{ctx_.value()} : std::nullopt;
287304

288305
std::make_shared<Detector<PlainSessionType, SslSessionType, HandlerType>>(
289-
std::move(socket), ctxRef, std::cref(tagFactory_), dosGuard_, handler_, adminVerification_
306+
std::move(socket),
307+
ctxRef,
308+
std::cref(tagFactory_),
309+
dosGuard_,
310+
handler_,
311+
adminVerification_,
312+
maxWsSendingQueueSize_
290313
)
291314
->run();
292315
}
@@ -348,14 +371,19 @@ make_HttpServer(
348371
throw std::logic_error("Admin config error, one method must be specified to authorize admin.");
349372
}
350373

374+
// If the transactions number is 200 per ledger, A client which subscribes everything will send 400+ feeds for
375+
// each ledger. we allow user delay 3 ledgers by default
376+
auto const maxWsSendingQueueSize = serverConfig.valueOr("ws_max_sending_queue_size", 1500);
377+
351378
auto server = std::make_shared<HttpServer<HandlerType>>(
352379
ioc,
353380
std::move(expectedSslContext).value(),
354381
boost::asio::ip::tcp::endpoint{address, port},
355382
util::TagDecoratorFactory(config),
356383
dosGuard,
357384
handler,
358-
std::move(adminPassword)
385+
std::move(adminPassword),
386+
maxWsSendingQueueSize
359387
);
360388

361389
server->run();

src/web/SslHttpSession.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
#include <chrono>
3939
#include <cstddef>
40+
#include <cstdint>
4041
#include <functional>
4142
#include <memory>
4243
#include <string>
@@ -59,6 +60,7 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
5960
public std::enable_shared_from_this<SslHttpSession<HandlerType>> {
6061
boost::beast::ssl_stream<boost::beast::tcp_stream> stream_;
6162
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
63+
std::uint32_t maxWsSendingQueueSize_;
6264

6365
public:
6466
/**
@@ -72,6 +74,7 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
7274
* @param dosGuard The denial of service guard to use
7375
* @param handler The server handler to use
7476
* @param buffer Buffer with initial data received from the peer
77+
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
7578
*/
7679
explicit SslHttpSession(
7780
tcp::socket&& socket,
@@ -81,7 +84,8 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
8184
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
8285
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
8386
std::shared_ptr<HandlerType> const& handler,
84-
boost::beast::flat_buffer buffer
87+
boost::beast::flat_buffer buffer,
88+
std::uint32_t maxWsSendingQueueSize
8589
)
8690
: impl::HttpBase<SslHttpSession, HandlerType>(
8791
ip,
@@ -93,6 +97,7 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
9397
)
9498
, stream_(std::move(socket), ctx)
9599
, tagFactory_(tagFactory)
100+
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
96101
{
97102
}
98103

@@ -173,7 +178,8 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
173178
this->handler_,
174179
std::move(this->buffer_),
175180
std::move(this->req_),
176-
ConnectionBase::isAdmin()
181+
ConnectionBase::isAdmin(),
182+
maxWsSendingQueueSize_
177183
)
178184
->run();
179185
}

0 commit comments

Comments
 (0)