-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.cpp
151 lines (136 loc) · 2.75 KB
/
server.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// server thread and session management
//
#include "server.h"
#include <functional>
#include <iostream>
using namespace mc;
server::server(size_t mc, bool thread)
:max_connections_(mc)
,thread_(thread)
{
}
server::~server()
{
push(data_chunk(data_chunk::ctl_shutdown, nullptr));
if (t_.get())
t_->join();
}
void server::start()
{
if (thread_)
t_.reset( new std::thread(std::bind(&server::process, this)) );
}
void server::register_session(session *s)
{
assert(sessions_.find(s) == sessions_.end());
if (sessions_.size() >= max_connections_)
{
delete s;
return;
}
//mark session activity time, just in case we want to enforce an idle timeout later
sessions_[s] = std::time(NULL);
}
void server::handle_close(session* s)
{
auto v = is_active_session(s);
if (!v.first)
return;
close_session(v.second);
}
void server::read_data(session* s, buffer b)
{
auto v = is_active_session(s);
if (!v.first) {
std::clog << "inactive read" << std::endl;
return;
}
// process data
auto it = v.second;
try {
if (!it->first->process_chunk(std::move(b))) {
close_session(it);
}
}
catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
close_session(it);
}
}
void server::control_session(session* s, buffer b)
{
auto v = is_active_session(s);
if (!v.first)
return;
auto it = v.second;
try {
if (!it->first->control(std::move(b))) {
close_session(it);
}
}
catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
close_session(it);
}
}
bool server::handle_chunk(const data_chunk& v)
{
assert(v.s_);
switch (v.t_)
{
case data_chunk::ctl_new_session:
assert(v.b_.empty());
register_session(v.s_);
break;
case data_chunk::ctl_read:
read_data(v.s_, std::move(v.b_));
break;
case data_chunk::ctl_session_ctl:
control_session(v.s_, std::move(v.b_));
break;
case data_chunk::ctl_close:
assert(v.b_.empty());
handle_close(v.s_);
break;
case data_chunk::ctl_shutdown:
return false;
default:
assert(false);
break;
}
return true;
}
std::pair<bool, server::sessions::iterator> server::is_active_session(session* s)
{
auto it = sessions_.find(s);
if (it == sessions_.end()) { //session isn't alive
return std::make_pair(false, it);
}
if (it->first->fd_ != s->fd_) { //additional paranoid check
return std::make_pair(false, it);
}
it->second = std::time(NULL);
return std::make_pair(true, it);
}
void server::close_session(sessions::iterator sit)
{
delete sit->first; //will close the socket
sessions_.erase(sit);
}
void server::cleanup()
{
for(auto& v: sessions_) {
delete v.first;
}
sessions_.clear();
}
void server::process() //main process thread
{
while(true) {
auto v = q_.wait_next();
assert(v.s_);
if (!handle_chunk(v))
break;
}
cleanup();
}