Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf1deeda

Browse files
committed
Completes (Untested) Asynchronous Connection
This commit actually completes the asynchronous connectionimplementation. What remains to be done are:- Complete the tests, make sure the server doesn't fail due to some serious asynchoronous handling issues.- Write a static file server that uses HTTP/1.1 streamingIn this commit the whole system builds, but there needs to be a check onwhether the thing actually works. In this commit are: * A means for reading and scheduling an asynchronous read handler. Documentation is required on the API of the read handler. * A fully asynchronous event handling and thread-pool dispatching of user supplied handlers.Expect a lot more changes starting this commit so hold your hats folksit's going to be a bumpy ride.
1 parent89f73f6 commitf1deeda

File tree

1 file changed

+231
-33
lines changed

1 file changed

+231
-33
lines changed

‎boost/network/protocol/http/server/async_connection.hpp‎

Lines changed: 231 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include<boost/network/protocol/http/server/request_parser.hpp>
2222
#include<boost/range/iterator_range.hpp>
2323
#include<boost/spirit/include/qi.hpp>
24+
#include<boost/optional.hpp>
25+
#include<boost/utility/typed_in_place_factory.hpp>
26+
#include<boost/thread/locks.hpp>
2427
#include<list>
2528
#include<vector>
2629
#include<iterator>
@@ -64,6 +67,55 @@ namespace boost { namespace network { namespace http {
6467

6568
typedeftypename string<Tag>::type string_type;
6669
typedef basic_request<Tag> request;
70+
typedef shared_ptr<async_connection> connection_ptr;
71+
72+
private:
73+
staticcharconst *status_message(status_t status) {
74+
staticcharconst
75+
ok_[] ="OK"
76+
, created_[] ="Created"
77+
, accepted_[] ="Accepted"
78+
, no_content_[] ="No Content"
79+
, multiple_choices_[] ="Multiple Choices"
80+
, moved_permanently_[] ="Moved Permanently"
81+
, moved_temporarily_[] ="Moved Temporarily"
82+
, not_modified_[] ="Not Modified"
83+
, bad_request_[] ="Bad Request"
84+
, unauthorized_[] ="Unauthorized"
85+
, forbidden_[] ="Fobidden"
86+
, not_found_[] ="Not Found"
87+
, not_supported_[] ="Not Supported"
88+
, not_acceptable_[] ="Not Acceptable"
89+
, internal_server_error_[] ="Internal Server Error"
90+
, not_implemented_[] ="Not Implemented"
91+
, bad_gateway_[] ="Bad Gateway"
92+
, service_unavailable_[] ="Service Unavailable"
93+
, unknown_[] ="Unknown"
94+
;
95+
switch(status) {
96+
case ok:return ok_;
97+
case created:return created_;
98+
case accepted:return accepted_;
99+
case no_content:return no_content_;
100+
case multiple_choices:return multiple_choices_;
101+
case moved_permanently:return moved_permanently_;
102+
case moved_temporarily:return moved_temporarily_;
103+
case not_modified:return not_modified_;
104+
case bad_request:return bad_request_;
105+
case unauthorized:return unauthorized_;
106+
case forbidden:return forbidden_;
107+
case not_found:return not_found_;
108+
case not_supported:return not_supported_;
109+
case not_acceptable:return not_acceptable_;
110+
case internal_server_error:return internal_server_error_;
111+
case not_implemented:return not_implemented_;
112+
case bad_gateway:return bad_gateway_;
113+
case service_unavailable:return service_unavailable_;
114+
default:return unknown_;
115+
}
116+
}
117+
118+
public:
67119

68120
async_connection(
69121
asio::io_service & io_service
@@ -82,21 +134,26 @@ namespace boost { namespace network { namespace http {
82134

83135
/** Function: template <class Range> set_headers(Range headers)
84136
* Precondition: headers have not been sent yet
85-
* Postcondition: headers have been linearized to a buffer.
137+
* Postcondition: headers have been linearized to a buffer,
138+
* and assumed to have been sent already when the function exits
86139
* Throws: std::logic_error in case the headers have already been sent.
87140
*
88141
* A call to set_headers takes a Range where each element models the
89142
* Header concept. This Range will be linearized onto a buffer, which is
90143
* then sent as soon as the first call to `write` or `flush` commences.
91144
*/
92145
template<classRange>
93-
voidset_headers(Range headers,bool immediate =true) {
94-
if (headers_already_sent)
95-
boost::throw_exception(std::logic_error("Headers have already been sent."));
146+
voidset_headers(Range headers) {
147+
lock_guardlock(headers_mutex);
148+
149+
if (headers_already_sent)boost::throw_exception(std::logic_error("Headers have already been sent."));
150+
151+
if (error_encountered)boost::throw_exception(boost::system::system_error(*error_encountered));
96152

97153
bool commit =false;
98154
BOOST_SCOPE_EXIT_TPL((&commit)(&headers_already_sent)) {
99155
if (!commit) headers_already_sent =false;
156+
else headers_already_sent =true;
100157
} BOOST_SCOPE_EXIT_END
101158

102159
typedef constants<Tag> consts;
@@ -113,50 +170,94 @@ namespace boost { namespace network { namespace http {
113170
stream <<consts::crlf();
114171
}
115172
stream <<consts::crlf();
116-
if (immediate)write_headers_only();
173+
174+
write_headers_only(
175+
boost::bind(
176+
&async_connection<Tag,Handler>::do_nothing
177+
, async_connection<Tag,Handler>::shared_from_this()
178+
));
117179

118180
commit =true;
119181
}
120182

121183
voidset_status(status_t new_status) {
184+
lock_guardlock(headers_mutex);
185+
if (headers_already_sent)boost::throw_exception(std::logic_error("Headers have already been sent, cannot reset status."));
186+
if (error_encountered)boost::throw_exception(boost::system::system_error(*error_encountered));
187+
122188
status = new_status;
123189
}
124190

125191
template<classRange>
126192
voidwrite(Rangeconst & range) {
127-
write_impl(
128-
boost::make_iterator_range(range)
129-
,boost::bind(
193+
if (error_encountered)boost::throw_exception(boost::system::system_error(*error_encountered));
194+
195+
boost::function<void(boost::system::error_code)> f =
196+
boost::bind(
130197
&async_connection<Tag,Handler>::default_error
131198
, async_connection<Tag,Handler>::shared_from_this()
132-
, _1
133-
)
199+
, _1);
200+
201+
write_impl(
202+
boost::make_iterator_range(range)
203+
, f
134204
);
135205
}
136206

137207
template<classRange,classCallback>
138208
voidwrite(Rangeconst & range, Callbackconst & callback) {
139-
write_impl(
140-
boost::make_iterator_range(range)
141-
, callback
142-
);
209+
if (error_encountered)boost::throw_exception(boost::system::system_error(*error_encountered));
210+
boost::function<void(boost::system::error_code)> f = callback;
211+
write_impl(boost::make_iterator_range(range), callback);
212+
}
213+
214+
private:
215+
typedef boost::array<char, BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> buffer_type;
216+
217+
public:
218+
typedef iterator_range<buffer_type::const_iterator> input_range;
219+
typedef boost::function<void(input_range, boost::system::error_code, std::size_t, connection_ptr)> read_callback_function;
220+
221+
voidread(read_callback_function callback) {
222+
if (error_encountered)boost::throw_exception(boost::system::system_error(*error_encountered));
223+
socket().async_read_some(
224+
asio::buffer(read_buffer_)
225+
, strand.wrap(
226+
boost::bind(
227+
&async_connection<Tag,Handler>::wrap_read_handler
228+
, async_connection<Tag,Handler>::shared_from_this()
229+
, callback
230+
, asio::placeholders::error, asio::placeholders::bytes_transferred)));
143231
}
144232

145233
asio::ip::tcp::socket &socket() {return socket_; }
146234
utils::thread_pool &thread_pool() {return thread_pool_; }
235+
boolhas_error() {return (!!error_encountered); }
236+
optional<boost::system::system_error>error()
237+
{return error_encountered; }
147238

148239
private:
149240

241+
voidwrap_read_handler(read_callback_function callback, boost::system::error_codeconst & ec, std::size_t bytes_transferred) {
242+
if (ec) error_encountered = in_place<boost::system::system_error>(ec);
243+
thread_pool().post(
244+
boost::bind(
245+
callback
246+
, ec
247+
, bytes_transferred
248+
, async_connection<Tag,Handler>::shared_from_this()));
249+
}
250+
150251
voiddefault_error(boost::system::error_codeconst & ec) {
151-
// TODO implement a sane default here, for now ignore the error
252+
error_encountered = in_place<boost::system::system_error>(ec);
152253
}
153254

154-
typedef boost::array<char, BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> buffer_type;
155255
typedef boost::array<char, BOOST_NETWORK_HTTP_SERVER_CONNECTION_BUFFER_SIZE> array;
156256
typedef std::list<shared_ptr<array> > array_list;
157257
typedef boost::shared_ptr<array_list> shared_array_list;
158258
typedef boost::shared_ptr<std::vector<asio::const_buffer> > shared_buffers;
159259
typedef request_parser<Tag> request_parser_type;
260+
typedef boost::lock_guard<boost::mutex> lock_guard;
160261

161262
asio::ip::tcp::socket socket_;
162263
asio::io_service::strand strand;
@@ -165,12 +266,14 @@ namespace boost { namespace network { namespace http {
165266
bool headers_already_sent;
166267
asio::streambuf headers_buffer;
167268

269+
boost::mutex headers_mutex;
168270
buffer_type read_buffer_;
169-
boost::uint16_t status;
271+
status_t status;
170272
request_parser_type parser;
171273
request request_;
172274
buffer_type::iterator new_start;
173275
string_type partial_parsed;
276+
optional<boost::system::system_error> error_encountered;
174277

175278
template<class,class>friendstructasync_server_base;
176279

@@ -317,12 +420,45 @@ namespace boost { namespace network { namespace http {
317420
BOOST_ASSERT(false &&"This is a bug, report to the cpp-netlib devel mailing list!");
318421
std::abort();
319422
}
423+
}else {
424+
error_encountered = in_place<boost::system::system_error>(ec);
320425
}
321-
// TODO log the error
322426
}
323427

324428
voidclient_error() {
325-
//FIXME write out a client request error
429+
status = bad_request;
430+
write_first_line(
431+
strand.wrap(
432+
boost::bind(
433+
&async_connection<Tag,Handler>::client_error_first_line_written
434+
, async_connection<Tag,Handler>::shared_from_this()
435+
, asio::placeholders::error
436+
, asio::placeholders::bytes_transferred)));
437+
}
438+
439+
voidclient_error_first_line_written(boost::system::error_codeconst & ec, std::size_t bytes_transferred) {
440+
staticcharconst * bad_request =
441+
"HTTP/1.0 400 Bad Request\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nBad Request.";
442+
443+
asio::async_write(
444+
socket()
445+
,asio::buffer(bad_request,115)
446+
, strand.wrap(
447+
boost::bind(
448+
&async_connection<Tag,Handler>::client_error_sent
449+
, async_connection<Tag,Handler>::shared_from_this()
450+
, asio::placeholders::error
451+
, asio::placeholders::bytes_transferred)));
452+
}
453+
454+
voidclient_error_sent(boost::system::error_codeconst & ec, std::size_t bytes_transferred) {
455+
if (!ec) {
456+
boost::system::error_code ignored;
457+
socket().shutdown(asio::ip::tcp::socket::shutdown_both, ignored);
458+
socket().close(ignored);
459+
}else {
460+
error_encountered = in_place<boost::system::system_error>(ec);
461+
}
326462
}
327463

328464
voidparse_headers(string_type & input,typename request::headers_container_type & container) {
@@ -339,38 +475,99 @@ namespace boost { namespace network { namespace http {
339475
, headers
340476
);
341477
}
342-
template<classRange,classCallback>
343-
voidwrite_headers(Range range, Callback callback) {
344-
// TODO send out the headers, then once that's done
345-
// call the write again on the range and callback
478+
479+
voiddo_nothing() {}
480+
481+
template<classRange>
482+
voidcontinue_write(Range range, boost::function<void(boost::system::error_code)> callback) {
483+
thread_pool().post(
484+
boost::bind(
485+
&async_connection<Tag,Handler>::write_impl<Range>
486+
, async_connection<Tag,Handler>::shared_from_this()
487+
, range, callback));
346488
}
347489

348-
voidwrite_headers_only() {
490+
template<classCallback>
491+
voidwrite_first_line(Callback callback) {
492+
std::vector<asio::const_buffer> buffers;
493+
typedef constants<Tag> consts;
494+
typename ostringstream<Tag>::type first_line_stream;
495+
first_line_stream
496+
<<consts::http_slash() <<1<<consts::dot() <<1 <<consts::space()
497+
<< status <<consts::space() <<status_message(status)
498+
<<consts::space()
499+
;
500+
std::string first_line = first_line_stream.str();
501+
buffers.push_back(asio::buffer(first_line));
502+
asio::async_write(
503+
socket()
504+
, buffers
505+
, callback);
349506
}
350507

351-
voidhandle_write_headers(boost::system::error_codeconst & ec) {
352-
if (ec) {
353-
// TODO signal somehow that there was an error so that subsequent
354-
// calls to write would throw an exception
355-
return;
508+
voidwrite_headers_only(boost::function<void()> callback) {
509+
write_first_line(
510+
strand.wrap(
511+
boost::bind(
512+
&async_connection<Tag,Handler>::handle_first_line_written
513+
, async_connection<Tag,Handler>::shared_from_this()
514+
, callback
515+
, asio::placeholders::error
516+
, asio::placeholders::bytes_transferred)));
517+
}
518+
519+
voidhandle_first_line_written(boost::function<void()> callback, boost::system::error_codeconst & ec, std::size_t bytes_transferred) {
520+
lock_guardlock(headers_mutex);
521+
if (!ec) {
522+
asio::async_write(
523+
socket()
524+
, headers_buffer
525+
, strand.wrap(
526+
boost::bind(
527+
&async_connection<Tag,Handler>::handle_write_headers
528+
, async_connection<Tag,Handler>::shared_from_this()
529+
, callback
530+
, asio::placeholders::error
531+
, asio::placeholders::bytes_transferred)));
532+
}else {
533+
error_encountered = in_place<boost::system::system_error>(ec);
534+
}
535+
}
536+
537+
voidhandle_write_headers(boost::function<void()> callback, boost::system::error_codeconst & ec, std::size_t bytes_transferred) {
538+
lock_guardlock(headers_mutex);
539+
if (!ec) {
540+
headers_buffer.consume(headers_buffer.size());
541+
headers_already_sent =true;
542+
callback();
543+
}else {
544+
error_encountered = in_place<boost::system::system_error>(ec);
356545
}
357-
headers_already_sent =true;
358546
}
359547

360548
voidhandle_write(
361549
boost::function<void(boost::system::error_codeconst &)> callback
362550
, shared_array_list temporaries
363551
, shared_buffers buffers
364552
, boost::system::error_codeconst & ec
553+
, std::size_t bytes_transferred
365554
) {
366555
// we want to forget the temporaries and buffers
367556
thread_pool().post(boost::bind(callback, ec));
368557
}
369558

370-
template<classRange,classCallback>
371-
voidwrite_impl(Range range,Callback callback) {
559+
template<classRange>
560+
voidwrite_impl(Range range,boost::function<void(boost::system::error_code)> callback) {
372561
if (!headers_already_sent) {
373-
write_headers(range, callback);
562+
boost::function<void(boost::system::error_code)> callback_function =
563+
callback;
564+
565+
write_headers_only(
566+
boost::bind(
567+
&async_connection<Tag,Handler>::continue_write<Range>
568+
, async_connection<Tag,Handler>::shared_from_this()
569+
, range, callback_function
570+
));
374571
return;
375572
}
376573

@@ -431,6 +628,7 @@ namespace boost { namespace network { namespace http {
431628
, temporaries
432629
, buffers// keep these alive until the handler is called!
433630
, boost::asio::placeholders::error
631+
, boost::asio::placeholders::bytes_transferred
434632
)
435633
)
436634
);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp