Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Thread group#590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
deanberris merged 3 commits intocpp-netlib:masterfromglynos:thread_group
Jan 31, 2016
Merged
Show file tree
Hide file tree
Changes from1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
NextNext commit
Added a C++11 enabled thread_group (cf. boost's thread_group).
  • Loading branch information
@glynos
glynos committedJan 30, 2016
commitd601264851aaa5eb7a54323ae6eb9a2bf8420e99
78 changes: 78 additions & 0 deletionsboost/network/utils/thread_group.hpp
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
// Copyright (c) Glyn Matthews 2016.
// (C) Copyright 2007-9 Anthony Williams
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOOST_NETWORK_UTILS_THREAD_GROUP_INC
#defineBOOST_NETWORK_UTILS_THREAD_GROUP_INC

#include<thread>
#include<mutex>
#include<memory>
#include<list>
#include<algorithm>

namespaceboost {
namespacenetwork {
namespaceutils {
classthread_group {
private:
thread_group(thread_groupconst&);
thread_group&operator=(thread_groupconst&);

public:
thread_group() {}
~thread_group() {}

template<typename F>
std::thread*create_thread(F threadfunc) {
std::lock_guard<std::mutex>guard(m);
std::unique_ptr<std::thread>new_thread(newstd::thread(threadfunc));
threads.push_back(std::move(new_thread));
return threads.back().get();
}

voidadd_thread(std::thread* thrd) {
if (thrd) {
std::lock_guard<std::mutex>guard(m);
threads.push_back(std::unique_ptr<std::thread>(thrd));
}
}

voidremove_thread(std::thread* thrd) {
std::lock_guard<std::mutex>guard(m);
autoconst it =std::find_if(threads.begin(), threads.end(),
[&thrd] (std::unique_ptr<std::thread> &arg) {
return arg.get() == thrd;
});
if (it != threads.end()) {
threads.erase(it);
}
}

voidjoin_all() {
std::unique_lock<std::mutex>guard(m);

for (auto &thread : threads) {
if (thread->joinable()) {
thread->join();
}
}
}

size_tsize()const {
std::unique_lock<std::mutex>guard(m);
return threads.size();
}

private:
std::list<std::unique_ptr<std::thread>> threads;
mutable std::mutex m;
};

}// namespace utils
}// namespace network
}// namespace boost

#endif// BOOST_NETWORK_UTILS_THREAD_GROUP_INC
11 changes: 6 additions & 5 deletionsboost/network/utils/thread_pool.hpp
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,21 +6,22 @@
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#include<cstddef>
#include<memory>
#include<functional>
#include<boost/asio/io_service.hpp>
#include<boost/function.hpp>
#include<boost/network/tags.hpp>
#include<boost/scope_exit.hpp>
#include<boost/thread/thread.hpp>
#include<cstddef>
//#include <boost/thread/thread.hpp>
#include<boost/network/utils/thread_group.hpp>

namespaceboost {
namespacenetwork {
namespaceutils {

typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
typedef std::shared_ptr<boost::thread_group> worker_threads_ptr;
typedef std::shared_ptr<utils::thread_group> worker_threads_ptr;
typedef std::shared_ptr<boost::asio::io_service::work> sentinel_ptr;

template<classTag>
Expand All@@ -46,7 +47,7 @@ struct basic_thread_pool {
sentinel_.reset();
io_service_.reset();
if (worker_threads_.get()) {
worker_threads_->interrupt_all();
//worker_threads_->interrupt_all();
worker_threads_->join_all();
}
worker_threads_.reset();
Expand All@@ -59,7 +60,7 @@ struct basic_thread_pool {
}

if (!worker_threads_.get()) {
worker_threads_.reset(newboost::thread_group);
worker_threads_.reset(newutils::thread_group);
}

if (!sentinel_.get()) {
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -9,24 +9,38 @@

#include <memory>
#include <mutex>
#include <chrono>
#include <functional>
#include <boost/network/utils/thread_group.hpp>
#include <boost/network/include/http/server.hpp>
#include <boost/network/uri.hpp>

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <list>
#include <signal.h>

#define Log(line) \
do { \
std::cout << line << std::endl; \
} while (false)
// This is needed to terminate the worker queue, and must be visible to the signal handler.
bool running = true;

struct handler;
typedef boost::network::http::server<handler> server;

struct server_data {
boost::network::http::server<handler> server;

server_data(const server::options &options)
: server(options) {}

void run() {
server.run();
}

void stop() {
running = false;
server.stop();
}
};

/**
* request + connection encapsulation (work item)
*/
Expand All@@ -36,7 +50,7 @@ struct request_data {

typedef std::shared_ptr<request_data> pointer;

request_data(server::requestreq, server::connection_ptr conn)
request_data(server::request req, server::connection_ptr conn)
: req(std::move(req)), conn(std::move(conn)) {}
};

Expand All@@ -49,24 +63,24 @@ struct work_queue {
list requests;
std::mutex mutex;

inline void put(const request_data::pointer&p_rd) {
inline void put(const request_data::pointer&request) {
std::unique_lock<std::mutex> lock(mutex);
requests.push_back(p_rd);
requests.push_back(request);
(void)lock;
}

inline request_data::pointer get() {
std::unique_lock<std::mutex> lock(mutex);

request_data::pointerp_ret;
request_data::pointerrequest;
if (!requests.empty()) {
p_ret = requests.front();
request = requests.front();
requests.pop_front();
}

(void)lock;

returnp_ret;
returnrequest;
}
};

Expand All@@ -92,11 +106,11 @@ struct handler {
*
* @param error
* @param signal
* @paramp_server_instance
* @paramserver
*/
void shut_me_down(const boost::system::error_code& error, int,
std::shared_ptr<server> p_server_instance) {
if (!error)p_server_instance->stop();
void shut_me_down(const boost::system::error_code& error, int signal,
std::shared_ptr<server_data> server) {
if (!error)server->stop();
}

/**
Expand All@@ -105,85 +119,79 @@ void shut_me_down(const boost::system::error_code& error, int,
* @param queue
*/
void process_request(work_queue& queue) {
while (!boost::this_thread::interruption_requested()) {
request_data::pointerp_req(queue.get());
if (p_req) {
while (running) {
request_data::pointerrequest(queue.get());
if (request) {

// some heavy work!
boost::this_thread::sleep(boost::posix_time::seconds(10));
std::this_thread::sleep_for(std::chrono::seconds(10));

p_req->conn->set_status(server::connection::ok);
p_req->conn->write("Hello, world!");
request->conn->set_status(server::connection::ok);
request->conn->write("Hello, world!");
}

boost::this_thread::sleep(boost::posix_time::microseconds(1000));
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}

int main(void) try {
// the thread group
std::shared_ptr<boost::thread_group> p_threads(
std::make_shared<boost::thread_group>());

// setup asio::io_service
std::shared_ptr<boost::asio::io_service> p_io_service(
std::make_shared<boost::asio::io_service>());
std::shared_ptr<boost::asio::io_service::work> p_work(
std::make_shared<boost::asio::io_service::work>(
boost::ref(*p_io_service)));

// io_service threads
{
int n_threads = 5;
while (0 < n_threads--) {
p_threads->create_thread([=] () { p_io_service->run(); });
int main() {
try {
// the thread group
auto threads(std::make_shared<boost::network::utils::thread_group>());

// setup asio::io_service
auto io_service(std::make_shared<boost::asio::io_service>());
auto work(std::make_shared<boost::asio::io_service::work>(std::ref(*io_service)));

// io_service threads
{
int n_threads = 5;
while (0 < n_threads--) {
threads->create_thread([=] () { io_service->run(); });
}
}
}

// the shared work queue
work_queue queue;
// the shared work queue
work_queue queue;

// worker threads that will process the request; off the queue
{
int n_threads = 5;
while (0 < n_threads--) {
p_threads->create_thread([&queue] () { process_request(queue); });
// worker threads that will process the request; off the queue
{
int n_threads = 5;
while (0 < n_threads--) {
threads->create_thread([&queue] () { process_request(queue); });
}
}
}

// setup the async server
handler request_handler(queue);
std::shared_ptr<server> p_server_instance(std::make_shared<server>(
server::options(request_handler)
.address("0.0.0.0")
.port("8800")
.io_service(p_io_service)
.reuse_address(true)
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(
2, p_io_service, p_threads))));

// setup clean shutdown
boost::asio::signal_set signals(*p_io_service, SIGINT, SIGTERM);
signals.async_wait([=] (boost::system::error_code const &ec, int signal) {
shut_me_down(ec, signal, p_server_instance);
});

// run the async server
p_server_instance->run();

// we are stopped - shutting down

p_threads->interrupt_all();

p_work.reset();
p_io_service->stop();

p_threads->join_all();

Log("Terminated normally");
exit(EXIT_SUCCESS);
}
catch (const std::exception& e) {
Log("Abnormal termination - exception:" << e.what());
exit(EXIT_FAILURE);
// setup the async server
handler request_handler(queue);
auto server(std::make_shared<server_data>(
server::options(request_handler)
.address("0.0.0.0")
.port("8800")
.io_service(io_service)
.reuse_address(true)
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(
2, io_service, threads))));

// setup clean shutdown
boost::asio::signal_set signals(*io_service, SIGINT, SIGTERM);
signals.async_wait([=] (boost::system::error_code const &ec, int signal) {
shut_me_down(ec, signal, server);
});

// run the async server
server->run();

work.reset();
io_service->stop();

threads->join_all();

std::cout << "Terminated normally" << std::endl;
exit(EXIT_SUCCESS);
}
catch (const std::exception& e) {
std::cerr << "Abnormal termination - exception:" << e.what() << std::endl;
exit(EXIT_FAILURE);
}
}

[8]ページ先頭

©2009-2025 Movatter.jp