-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathserver.hpp
341 lines (294 loc) · 11.7 KB
/
server.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
//
// server.hpp
// ~~~~~~~~~~~~~~
//
// Unified channel class implementing TCP/UDP server types.
//
// Copyright (c) 2010 Serge Aleynikov <serge@aleynikov.org>
// Created: 2010-04-11
//
#pragma once
#include <boost/version.hpp>
#include <boost/asio.hpp>
#include <string>
#include <boost/noncopyable.hpp>
#include "channel.hpp"
namespace eixx {
//------------------------------------------------------------------------------
// channel_manager class
//------------------------------------------------------------------------------
/// Manages open channels so that they may be cleanly stopped when the server
/// needs to shut down.
template <class Connection>
class channel_manager : private boost::noncopyable
{
public:
using channel_ptr = boost::shared_ptr<Connection>;
/// Add the specified channel to the manager and start it.
void start(channel_ptr c, bool a_connected = false) {
m_channels.insert(c);
c->start(a_connected);
}
/// Stop the specified channel.
void stop(channel_ptr c) {
m_channels.erase(c);
c->stop();
c.reset();
}
/// Stop all channels.
void stop_all() {
std::for_each(m_channels.begin(), m_channels.end(),
std::bind(&Connection::stop, _1));
m_channels.clear();
}
private:
/// The managed channels.
std::set<channel_ptr> m_channels;
};
// Forward declaration
template<class Handler> class tcp_server;
template<class Handler> class uds_server;
//------------------------------------------------------------------------------
// server class
//------------------------------------------------------------------------------
/// The top-level class of the HTTP server.
template<class Handler>
class server : private boost::noncopyable
{
public:
using handler_type = Handler;
using pointer = boost::shared_ptr<server<Handler>>;
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.
server(boost::asio::io_service& io, handler_type& h)
: m_io_service(io)
, m_request_handler(h)
{}
virtual ~server() {}
static pointer create(
boost::asio::io_service& a_svc,
handler_type& a_h,
const boost::property_tree::ptree& pt) {
std::string url = pt.get<std::string>("address", "");
size_t n = url.find(':');
std::string stype = url.substr(0, n);
if (stype.empty())
THROW_RUNTIME_ERROR("Missing 'address' configuration option");
typename channel<handler_type>::channel_type type =
channel<handler_type>::parse_channel_type(stype);
pointer p = create(a_svc, a_h, type);
p->init(pt);
return p;
}
static pointer create(
boost::asio::io_service& a_svc,
handler_type& a_h,
typename channel<handler_type>::channel_type a_type) {
switch (a_type) {
case channel<handler_type>::TCP:
return pointer(new tcp_server<handler_type>(a_svc, a_h));
case channel<handler_type>::UDS:
return pointer(new uds_server<handler_type>(a_svc, a_h));
default:
THROW_RUNTIME_ERROR("Unknown server type: " << a_type);
}
}
virtual void init(const boost::property_tree::ptree& pt)
throw(std::runtime_error) = 0;
/// Start the listening process.
virtual void start() {}
/// Run the server's io_service loop.
virtual void run() {
// The io_service::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting
// for new incoming channels.
m_io_service.run();
}
/// Stop the server.
virtual void stop() {
// Post a call to the stop function so that server::stop() is safe to call
// from any thread.
m_io_service.post(std::bind(&server::handle_stop, this));
}
// Event handlers
boost::function<void (channel<Handler>*)> on_connect;
boost::function<void (channel<Handler>*,
const boost::system::error_code&)> on_disconnect;
boost::function<void (channel<Handler>*,
const std::string&)> on_client_error;
boost::function<void (server<Handler>*,
const std::string&)> on_server_error;
protected:
/// Handle a request to stop the server.
virtual void handle_stop() {
m_channels.stop_all();
}
bool handle_accept_internal(const boost::system::error_code& e,
typename channel<Handler>::pointer con) {
if (e) {
if (!this->on_server_error)
THROW_RUNTIME_ERROR("Error in server::handle_accept: " << e.message());
this->on_server_error(static_cast<server<handler_type>*>(this), e.message());
return false;
}
Handler& h = con->handler();
con->on_disconnect = this->on_disconnect;
con->on_error = this->on_client_error;
if (this->on_connect)
this->on_connect(con.get());
this->m_channels.start(con, true);
return true;
}
/// The io_service used to perform asynchronous operations.
boost::asio::io_service& m_io_service;
/// The handler for all incoming requests.
Handler& m_request_handler;
/// The channel manager which owns all live channels.
channel_manager< channel<Handler> > m_channels;
};
/// The TCP server.
template<class Handler>
class tcp_server : public server< Handler >
{
public:
typedef server< Handler > base_t;
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.
tcp_server(boost::asio::io_service& io, Handler& h)
: base_t(io, h)
, m_acceptor(this->m_io_service)
, m_new_channel(
tcp_channel<Handler>::create(this->m_io_service, h, channel<Handler>::TCP)
)
{
}
void init(const boost::property_tree::ptree& pt) throw(std::runtime_error) {
if (m_acceptor.is_open())
m_acceptor.close();
std::string url = pt.get<std::string>("address", "tcp://0.0.0.0:0");
std::string proto, addr, port, path;
if (!parse_url(url, proto, addr, port, path))
THROW_RUNTIME_ERROR("Invalid URL address: " << url);
if (proto != "tcp")
THROW_RUNTIME_ERROR("Expected 'tcp' protocol type!");
if (port.empty() || port == "0")
THROW_RUNTIME_ERROR("tcp_server invalid 'port' configuration: " << port);
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(this->m_io_service);
boost::asio::ip::tcp::resolver::query query(addr, port);
boost::asio::ip::tcp::endpoint end;
m_endpoint = *resolver.resolve(query);
if (m_endpoint == end)
THROW_RUNTIME_ERROR("Error resolving address");
}
void start() {
m_acceptor.open(m_endpoint.protocol());
m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
m_acceptor.bind(m_endpoint);
m_acceptor.listen();
m_acceptor.async_accept(
static_cast<tcp_channel<Handler>&>(*m_new_channel).socket(),
std::bind(&tcp_server<Handler>::handle_accept,
this, boost::asio::placeholders::error));
}
private:
/// Acceptor used to listen for incoming channels.
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::ip::tcp::endpoint m_endpoint;
/// The next channel to be accepted.
typename tcp_channel<Handler>::pointer m_new_channel;
/// Handle completion of an asynchronous accept operation.
void handle_accept(const boost::system::error_code& e) {
if (handle_accept_internal(e, m_new_channel)) {
m_new_channel =
tcp_channel<Handler>::create(this->m_io_service,
m_new_channel->handler(), channel<Handler>::TCP);
m_acceptor.async_accept(
static_cast<tcp_channel<Handler>&>(*m_new_channel).socket(),
std::bind(&tcp_server<Handler>::handle_accept,
this, boost::asio::placeholders::error));
}
}
/// Handle a request to stop the server.
virtual void handle_stop() {
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run() call
// will exit.
m_acceptor.close();
base_t::handle_stop();
}
};
/// The Unix Domain Socket server.
template<class Handler>
class uds_server : public server< Handler >
{
public:
typedef server< Handler > base_t;
/// Construct the server to listen on the specified UDS filename, and
/// serve up files from the given directory.
uds_server(boost::asio::io_service& io, Handler& h)
: base_t(io, h)
, m_acceptor(this->m_io_service)
, m_new_channel(
uds_channel<Handler>::create(this->m_io_service, h, channel<Handler>::UDS)
)
{
}
~uds_server() {
if (!m_endpoint.path().empty())
::unlink(m_endpoint.path().c_str());
}
void init(const boost::property_tree::ptree& pt) throw(std::runtime_error) {
if (m_acceptor.is_open())
m_acceptor.close();
std::string url = pt.get<std::string>("address", "");
std::string proto, addr, port, path;
if (url.empty() || !parse_url(url, proto, addr, port, path))
THROW_RUNTIME_ERROR("Invalid URL address: '" << url << "'");
if (proto != "uds")
THROW_RUNTIME_ERROR("Expected 'uds' protocol type!");
if (path.empty())
THROW_RUNTIME_ERROR("uds_server empty 'address' configuration");
m_endpoint.path(path);
}
void start() {
m_acceptor.open(m_endpoint.protocol());
m_acceptor.set_option(
boost::asio::local::stream_protocol::acceptor::reuse_address(true));
m_acceptor.bind(m_endpoint);
m_acceptor.listen();
// Open the acceptor
m_acceptor.async_accept(
static_cast<uds_channel<Handler>&>(*m_new_channel).socket(),
std::bind(&uds_server<Handler>::handle_accept,
this, boost::asio::placeholders::error));
}
private:
/// Acceptor used to listen for incoming channels on Unix Domain Socket.
boost::asio::local::stream_protocol::acceptor m_acceptor;
boost::asio::local::stream_protocol::endpoint m_endpoint;
/// The next channel to be accepted.
typename uds_channel<Handler>::pointer m_new_channel;
/// Handle completion of an asynchronous accept operation.
void handle_accept(const boost::system::error_code& e) {
if (handle_accept_internal(e, m_new_channel)) {
m_new_channel =
uds_channel<Handler>::create(this->m_io_service,
m_new_channel->handler(), channel<Handler>::UDS);
m_acceptor.async_accept(
static_cast<uds_channel<Handler>&>(*m_new_channel).socket(),
std::bind(&uds_server<Handler>::handle_accept,
this, boost::asio::placeholders::error));
}
}
/// Handle a request to stop the server.
virtual void handle_stop() {
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run() call
// will exit.
m_acceptor.close();
base_t::handle_stop();
}
};
} // namespace eixx