Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd601264

Browse files
committed
Added a C++11 enabled thread_group (cf. boost's thread_group).
1 parent6bff172 commitd601264

File tree

3 files changed

+176
-89
lines changed

3 files changed

+176
-89
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright (c) Glyn Matthews 2016.
2+
// (C) Copyright 2007-9 Anthony Williams
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#ifndef BOOST_NETWORK_UTILS_THREAD_GROUP_INC
8+
#defineBOOST_NETWORK_UTILS_THREAD_GROUP_INC
9+
10+
#include<thread>
11+
#include<mutex>
12+
#include<memory>
13+
#include<list>
14+
#include<algorithm>
15+
16+
namespaceboost {
17+
namespacenetwork {
18+
namespaceutils {
19+
classthread_group {
20+
private:
21+
thread_group(thread_groupconst&);
22+
thread_group&operator=(thread_groupconst&);
23+
24+
public:
25+
thread_group() {}
26+
~thread_group() {}
27+
28+
template<typename F>
29+
std::thread*create_thread(F threadfunc) {
30+
std::lock_guard<std::mutex>guard(m);
31+
std::unique_ptr<std::thread>new_thread(newstd::thread(threadfunc));
32+
threads.push_back(std::move(new_thread));
33+
return threads.back().get();
34+
}
35+
36+
voidadd_thread(std::thread* thrd) {
37+
if (thrd) {
38+
std::lock_guard<std::mutex>guard(m);
39+
threads.push_back(std::unique_ptr<std::thread>(thrd));
40+
}
41+
}
42+
43+
voidremove_thread(std::thread* thrd) {
44+
std::lock_guard<std::mutex>guard(m);
45+
autoconst it =std::find_if(threads.begin(), threads.end(),
46+
[&thrd] (std::unique_ptr<std::thread> &arg) {
47+
return arg.get() == thrd;
48+
});
49+
if (it != threads.end()) {
50+
threads.erase(it);
51+
}
52+
}
53+
54+
voidjoin_all() {
55+
std::unique_lock<std::mutex>guard(m);
56+
57+
for (auto &thread : threads) {
58+
if (thread->joinable()) {
59+
thread->join();
60+
}
61+
}
62+
}
63+
64+
size_tsize()const {
65+
std::unique_lock<std::mutex>guard(m);
66+
return threads.size();
67+
}
68+
69+
private:
70+
std::list<std::unique_ptr<std::thread>> threads;
71+
mutable std::mutex m;
72+
};
73+
74+
}// namespace utils
75+
}// namespace network
76+
}// namespace boost
77+
78+
#endif// BOOST_NETWORK_UTILS_THREAD_GROUP_INC

‎boost/network/utils/thread_pool.hpp‎

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,22 @@
66
// (See accompanying file LICENSE_1_0.txt or copy at
77
// http://www.boost.org/LICENSE_1_0.txt)
88

9+
#include<cstddef>
910
#include<memory>
1011
#include<functional>
1112
#include<boost/asio/io_service.hpp>
1213
#include<boost/function.hpp>
1314
#include<boost/network/tags.hpp>
1415
#include<boost/scope_exit.hpp>
15-
#include<boost/thread/thread.hpp>
16-
#include<cstddef>
16+
//#include <boost/thread/thread.hpp>
17+
#include<boost/network/utils/thread_group.hpp>
1718

1819
namespaceboost {
1920
namespacenetwork {
2021
namespaceutils {
2122

2223
typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
23-
typedef std::shared_ptr<boost::thread_group> worker_threads_ptr;
24+
typedef std::shared_ptr<utils::thread_group> worker_threads_ptr;
2425
typedef std::shared_ptr<boost::asio::io_service::work> sentinel_ptr;
2526

2627
template<classTag>
@@ -46,7 +47,7 @@ struct basic_thread_pool {
4647
sentinel_.reset();
4748
io_service_.reset();
4849
if (worker_threads_.get()) {
49-
worker_threads_->interrupt_all();
50+
//worker_threads_->interrupt_all();
5051
worker_threads_->join_all();
5152
}
5253
worker_threads_.reset();
@@ -59,7 +60,7 @@ struct basic_thread_pool {
5960
}
6061

6162
if (!worker_threads_.get()) {
62-
worker_threads_.reset(newboost::thread_group);
63+
worker_threads_.reset(newutils::thread_group);
6364
}
6465

6566
if (!sentinel_.get()) {

‎libs/network/example/http/hello_world_async_server_with_work_queue.cpp‎

Lines changed: 92 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,38 @@
99

1010
#include<memory>
1111
#include<mutex>
12+
#include<chrono>
1213
#include<functional>
14+
#include<boost/network/utils/thread_group.hpp>
1315
#include<boost/network/include/http/server.hpp>
1416
#include<boost/network/uri.hpp>
15-
1617
#include<boost/asio.hpp>
17-
#include<boost/thread.hpp>
1818
#include<iostream>
1919
#include<list>
2020
#include<signal.h>
2121

22-
#defineLog(line) \
23-
do { \
24-
std::cout << line << std::endl; \
25-
}while (false)
22+
// This is needed to terminate the worker queue, and must be visible to the signal handler.
23+
bool running =true;
2624

2725
structhandler;
2826
typedef boost::network::http::server<handler> server;
2927

28+
structserver_data {
29+
boost::network::http::server<handler> server;
30+
31+
server_data(const server::options &options)
32+
: server(options) {}
33+
34+
voidrun() {
35+
server.run();
36+
}
37+
38+
voidstop() {
39+
running =false;
40+
server.stop();
41+
}
42+
};
43+
3044
/**
3145
* request + connection encapsulation (work item)
3246
*/
@@ -36,7 +50,7 @@ struct request_data {
3650

3751
typedef std::shared_ptr<request_data> pointer;
3852

39-
request_data(server::requestreq, server::connection_ptr conn)
53+
request_data(server::request req, server::connection_ptr conn)
4054
: req(std::move(req)), conn(std::move(conn)) {}
4155
};
4256

@@ -49,24 +63,24 @@ struct work_queue {
4963
list requests;
5064
std::mutex mutex;
5165

52-
inlinevoidput(const request_data::pointer&p_rd) {
66+
inlinevoidput(const request_data::pointer&request) {
5367
std::unique_lock<std::mutex>lock(mutex);
54-
requests.push_back(p_rd);
68+
requests.push_back(request);
5569
(void)lock;
5670
}
5771

5872
inline request_data::pointerget() {
5973
std::unique_lock<std::mutex>lock(mutex);
6074

61-
request_data::pointerp_ret;
75+
request_data::pointerrequest;
6276
if (!requests.empty()) {
63-
p_ret = requests.front();
77+
request = requests.front();
6478
requests.pop_front();
6579
}
6680

6781
(void)lock;
6882

69-
returnp_ret;
83+
returnrequest;
7084
}
7185
};
7286

@@ -92,11 +106,11 @@ struct handler {
92106
*
93107
* @param error
94108
* @param signal
95-
* @paramp_server_instance
109+
* @paramserver
96110
*/
97-
voidshut_me_down(const boost::system::error_code& error,int,
98-
std::shared_ptr<server> p_server_instance) {
99-
if (!error)p_server_instance->stop();
111+
voidshut_me_down(const boost::system::error_code& error,int signal,
112+
std::shared_ptr<server_data> server) {
113+
if (!error)server->stop();
100114
}
101115

102116
/**
@@ -105,85 +119,79 @@ void shut_me_down(const boost::system::error_code& error, int,
105119
* @param queue
106120
*/
107121
voidprocess_request(work_queue& queue) {
108-
while (!boost::this_thread::interruption_requested()) {
109-
request_data::pointerp_req(queue.get());
110-
if (p_req) {
122+
while (running) {
123+
request_data::pointerrequest(queue.get());
124+
if (request) {
111125

112126
// some heavy work!
113-
boost::this_thread::sleep(boost::posix_time::seconds(10));
127+
std::this_thread::sleep_for(std::chrono::seconds(10));
114128

115-
p_req->conn->set_status(server::connection::ok);
116-
p_req->conn->write("Hello, world!");
129+
request->conn->set_status(server::connection::ok);
130+
request->conn->write("Hello, world!");
117131
}
118132

119-
boost::this_thread::sleep(boost::posix_time::microseconds(1000));
133+
std::this_thread::sleep_for(std::chrono::microseconds(1000));
120134
}
121135
}
122136

123-
intmain(void) try {
124-
// the thread group
125-
std::shared_ptr<boost::thread_group>p_threads(
126-
std::make_shared<boost::thread_group>());
127-
128-
// setup asio::io_service
129-
std::shared_ptr<boost::asio::io_service>p_io_service(
130-
std::make_shared<boost::asio::io_service>());
131-
std::shared_ptr<boost::asio::io_service::work>p_work(
132-
std::make_shared<boost::asio::io_service::work>(
133-
boost::ref(*p_io_service)));
134-
135-
// io_service threads
136-
{
137-
int n_threads =5;
138-
while (0 < n_threads--) {
139-
p_threads->create_thread([=] () { p_io_service->run(); });
137+
intmain() {
138+
try {
139+
// the thread group
140+
autothreads(std::make_shared<boost::network::utils::thread_group>());
141+
142+
// setup asio::io_service
143+
autoio_service(std::make_shared<boost::asio::io_service>());
144+
autowork(std::make_shared<boost::asio::io_service::work>(std::ref(*io_service)));
145+
146+
// io_service threads
147+
{
148+
int n_threads =5;
149+
while (0 < n_threads--) {
150+
threads->create_thread([=] () { io_service->run(); });
151+
}
140152
}
141-
}
142153

143-
// the shared work queue
144-
work_queue queue;
154+
// the shared work queue
155+
work_queue queue;
145156

146-
// worker threads that will process the request; off the queue
147-
{
148-
int n_threads =5;
149-
while (0 < n_threads--) {
150-
p_threads->create_thread([&queue] () {process_request(queue); });
157+
// worker threads that will process the request; off the queue
158+
{
159+
int n_threads =5;
160+
while (0 < n_threads--) {
161+
threads->create_thread([&queue] () {process_request(queue); });
162+
}
151163
}
152-
}
153164

154-
// setup the async server
155-
handlerrequest_handler(queue);
156-
std::shared_ptr<server>p_server_instance(std::make_shared<server>(
157-
server::options(request_handler)
158-
.address("0.0.0.0")
159-
.port("8800")
160-
.io_service(p_io_service)
161-
.reuse_address(true)
162-
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(
163-
2, p_io_service, p_threads))));
164-
165-
// setup clean shutdown
166-
boost::asio::signal_setsignals(*p_io_service, SIGINT, SIGTERM);
167-
signals.async_wait([=] (boost::system::error_codeconst &ec,int signal) {
168-
shut_me_down(ec, signal, p_server_instance);
169-
});
170-
171-
// run the async server
172-
p_server_instance->run();
173-
174-
// we are stopped - shutting down
175-
176-
p_threads->interrupt_all();
177-
178-
p_work.reset();
179-
p_io_service->stop();
180-
181-
p_threads->join_all();
182-
183-
Log("Terminated normally");
184-
exit(EXIT_SUCCESS);
185-
}
186-
catch (const std::exception& e) {
187-
Log("Abnormal termination - exception:" << e.what());
188-
exit(EXIT_FAILURE);
165+
// setup the async server
166+
handlerrequest_handler(queue);
167+
autoserver(std::make_shared<server_data>(
168+
server::options(request_handler)
169+
.address("0.0.0.0")
170+
.port("8800")
171+
.io_service(io_service)
172+
.reuse_address(true)
173+
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(
174+
2, io_service, threads))));
175+
176+
// setup clean shutdown
177+
boost::asio::signal_setsignals(*io_service, SIGINT, SIGTERM);
178+
signals.async_wait([=] (boost::system::error_codeconst &ec,int signal) {
179+
shut_me_down(ec, signal, server);
180+
});
181+
182+
// run the async server
183+
server->run();
184+
185+
work.reset();
186+
io_service->stop();
187+
188+
threads->join_all();
189+
190+
std::cout <<"Terminated normally" << std::endl;
191+
exit(EXIT_SUCCESS);
192+
}
193+
catch (const std::exception& e) {
194+
std::cerr <<"Abnormal termination - exception:" << e.what() << std::endl;
195+
exit(EXIT_FAILURE);
196+
}
189197
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp