Expand Up @@ -18,7 +18,7 @@ #include <signal.h> #define Log(line) \ do { std::cout << line << std::endl; } while(false) do { std::cout << line << std::endl; } while(false)struct handler; typedef boost::network::http::async_server<handler> server; Expand All @@ -28,66 +28,66 @@ typedef boost::network::http::async_server<handler> server; */ struct request_data { const server::request req; server::connection_ptr conn; const server::request req; server::connection_ptr conn; typedef boost::shared_ptr< request_data > pointer; typedef boost::shared_ptr< request_data > pointer; request_data(server::request const& req, const server::connection_ptr& conn) : req(req), conn(conn) { } request_data(server::request const& req, const server::connection_ptr& conn) : req(req), conn(conn) { }}; /** * A basic work queue */ struct work_queue { typedef std::list<request_data::pointer> list; typedef std::list<request_data::pointer> list; list requests; boost::mutex mutex; list requests; boost::mutex mutex; inline void put(const request_data::pointer& p_rd) { boost::unique_lock< boost::mutex > lock(mutex); requests.push_back(p_rd); (void)lock; } inline void put(const request_data::pointer& p_rd) { boost::unique_lock< boost::mutex > lock(mutex); requests.push_back(p_rd); (void)lock; } inline request_data::pointer get() { boost::unique_lock< boost::mutex > lock(mutex); inline request_data::pointer get() { boost::unique_lock< boost::mutex > lock(mutex); request_data::pointer p_ret; if (!requests.empty()) { p_ret = requests.front(); requests.pop_front(); } request_data::pointer p_ret; if (!requests.empty()) { p_ret = requests.front(); requests.pop_front(); } (void)lock; (void)lock; return p_ret; } return p_ret; }}; struct handler { work_queue& queue; handler(work_queue& queue) : queue(queue) { } /** * Feed the work queue * * @param req * @param conn */ void operator()(server::request const& req, const server::connection_ptr& conn) { queue.put(boost::make_shared<request_data>(req, conn)); } work_queue& queue; handler(work_queue& queue) : queue(queue) { } /** * Feed the work queue * * @param req * @param conn */ void operator()(server::request const& req, const server::connection_ptr& conn) { queue.put(boost::make_shared<request_data>(req, conn)); }}; /** Expand All @@ -98,11 +98,11 @@ struct handler * @param p_server_instance */ void shut_me_down( const boost::system::error_code& error , int signal, boost::shared_ptr< server > p_server_instance) const boost::system::error_code& error , int signal, boost::shared_ptr< server > p_server_instance){ if (!error) p_server_instance->stop(); if (!error) p_server_instance->stop();} /** Expand All @@ -112,89 +112,89 @@ void shut_me_down( */ void process_request(work_queue& queue) { while(!boost::this_thread::interruption_requested()) { request_data::pointer p_req(queue.get()); if (p_req) { while(!boost::this_thread::interruption_requested()) { request_data::pointer p_req(queue.get()); if (p_req) { // some heavy work! boost::this_thread::sleep(boost::posix_time::seconds(10)); // some heavy work! boost::this_thread::sleep(boost::posix_time::seconds(10)); p_req->conn->set_status(server::connection::ok); p_req->conn->write("Hello, world!"); } p_req->conn->set_status(server::connection::ok); p_req->conn->write("Hello, world!"); } boost::this_thread::sleep(boost::posix_time::microseconds(1000)); } boost::this_thread::sleep(boost::posix_time::microseconds(1000)); }} int main(void) try { // the thread group boost::shared_ptr< boost::thread_group > p_threads( boost::make_shared< boost::thread_group>()); // setup asio::io_service boost::shared_ptr< boost::asio::io_service > p_io_service( boost::make_shared< boost::asio::io_service >()); boost::shared_ptr< boost::asio::io_service::work > p_work( boost::make_shared< boost::asio::io_service::work >( boost::ref(*p_io_service))); // io_service threads { int n_threads = 5; while(0 < n_threads--) { p_threads->create_thread( boost::bind(&boost::asio::io_service::run, p_io_service)); } } // the shared work queue work_queue queue; // worker threads that will process the request; off the queue { int n_threads = 5; while(0 < n_threads--) { p_threads->create_thread( boost::bind(process_request, boost::ref(queue))); } } // setup the async server handler request_handler(queue); boost::shared_ptr< server > p_server_instance( boost::make_shared<server>( server::options(request_handler). address("0.0.0.0") .port("8800") .io_service(p_io_service) .reuse_address(true) .thread_pool( boost::make_shared<boost::network::utils::thread_pool>( 2 , p_io_service, p_threads)))); // setup clean shutdown boost::asio::signal_set signals(*p_io_service, SIGINT, SIGTERM); signals.async_wait(boost::bind(shut_me_down, _1, _2, p_server_instance)); // run the async server p_server_instance->run(); // we are stopped - shutting down p_threads->interrupt_all(); p_work.reset(); p_io_service->stop(); p_threads->join_all(); Log("Terminated normally"); exit(EXIT_SUCCESS); // the thread group boost::shared_ptr< boost::thread_group > p_threads( boost::make_shared< boost::thread_group>()); // setup asio::io_service boost::shared_ptr< boost::asio::io_service > p_io_service( boost::make_shared< boost::asio::io_service >()); boost::shared_ptr< boost::asio::io_service::work > p_work( boost::make_shared< boost::asio::io_service::work >( boost::ref(*p_io_service))); // io_service threads { int n_threads = 5; while(0 < n_threads--) { p_threads->create_thread( boost::bind(&boost::asio::io_service::run, p_io_service)); } } // the shared work queue work_queue queue; // worker threads that will process the request; off the queue { int n_threads = 5; while(0 < n_threads--) { p_threads->create_thread( boost::bind(process_request, boost::ref(queue))); } } // setup the async server handler request_handler(queue); boost::shared_ptr< server > p_server_instance( boost::make_shared<server>( server::options(request_handler). address("0.0.0.0") .port("8800") .io_service(p_io_service) .reuse_address(true) .thread_pool( boost::make_shared<boost::network::utils::thread_pool>( 2 , p_io_service, p_threads)))); // setup clean shutdown boost::asio::signal_set signals(*p_io_service, SIGINT, SIGTERM); signals.async_wait(boost::bind(shut_me_down, _1, _2, p_server_instance)); // run the async server p_server_instance->run(); // we are stopped - shutting down p_threads->interrupt_all(); p_work.reset(); p_io_service->stop(); p_threads->join_all(); Log("Terminated normally"); exit(EXIT_SUCCESS);} catch(const std::exception& e) { Log("Abnormal termination - exception:"<<e.what()); exit(EXIT_FAILURE); Log("Abnormal termination - exception:"<<e.what()); exit(EXIT_FAILURE);}