Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7bb7b5b

Browse files
authored
Merge pull request#1690 from ericniebler/execpools-bulk-customization
fix the execpools' customization of `bulk` and `bulk_chunked`
2 parents1907427 +bcd5654 commit7bb7b5b

File tree

4 files changed

+109
-83
lines changed

4 files changed

+109
-83
lines changed

‎include/exec/static_thread_pool.hpp‎

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ namespace exec {
8888

8989
template<class>
9090
structnot_a_sender {
91+
using__t = not_a_sender;
92+
using __id = not_a_sender;
9193
using sender_concept =sender_t;
9294
};
9395

@@ -1298,9 +1300,9 @@ namespace exec {
12981300
template<classF>
12991301
voidapply(F f) {
13001302
std::visit(
1301-
[&](auto& tupl) ->void {
1302-
ifconstexpr (same_as<__decay_t<decltype(tupl)>, std::monostate>) {
1303-
std::terminate();
1303+
[&]<classTuple>(Tuple& tupl) ->void {
1304+
ifconstexpr (same_as<Tuple, std::monostate>) {
1305+
STDEXEC_TERMINATE();
13041306
}else {
13051307
std::apply([&](auto&... args) ->void {f(args...); }, tupl);
13061308
}
@@ -1356,15 +1358,13 @@ namespace exec {
13561358

13571359
shared_state& state = shared_state_;
13581360

1359-
ifconstexpr (MayThrow) {
1360-
STDEXEC_TRY {
1361-
state.data_.templateemplace<tuple_t>(static_cast<As&&>(as)...);
1362-
}
1363-
STDEXEC_CATCH_ALL {
1361+
STDEXEC_TRY {
1362+
state.data_.templateemplace<tuple_t>(static_cast<As&&>(as)...);
1363+
}
1364+
STDEXEC_CATCH_ALL {
1365+
ifconstexpr (MayThrow) {
13641366
stdexec::set_error(std::move(state.rcvr_),std::current_exception());
13651367
}
1366-
}else {
1367-
state.data_.templateemplace<tuple_t>(static_cast<As&&>(as)...);
13681368
}
13691369

13701370
if (state.shape_) {

‎include/execpools/thread_pool_base.hpp‎

Lines changed: 71 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,46 @@ namespace execpools {
4242
template<classDerivedPoolType_,classReceiverId>
4343
friendstructoperation;
4444

45+
template<class>
46+
structnot_a_sender {
47+
using sender_concept = stdexec::sender_t;
48+
};
49+
4550
public:
51+
structscheduler;
52+
53+
structdomain : stdexec::default_domain {
54+
using__t = domain;
55+
using __id = domain;
56+
57+
template<stdexec::sender_expr_for<stdexec::bulk_chunked_t> Sender,classEnv>
58+
staticconstexprautotransform_sender(stdexec::set_value_t, Sender&& sndr,const Env& env) {
59+
auto& [tag, data, child] = sndr;
60+
auto& [pol, shape, fun] = data;
61+
62+
ifconstexpr (stdexec::__completes_on<decltype(child), scheduler, Env>) {
63+
auto sch =
64+
stdexec::get_completion_scheduler<stdexec::set_value_t>(stdexec::get_env(child), env);
65+
usingsender_t =
66+
scheduler::templatebulk_sender_t<decltype(child),decltype(shape),decltype(fun)>;
67+
returnsender_t{
68+
*sch.pool_,
69+
stdexec::__forward_like<Sender>(child),
70+
shape,
71+
stdexec::__forward_like<Sender>(fun)};
72+
}else {
73+
static_assert(
74+
stdexec::__completes_on<Sender, scheduler, Env>,
75+
"Unable to dispatch bulk work to the static_thread_pool. The predecessor sender"
76+
"is not able to provide a static_thread_pool scheduler.");
77+
return not_a_sender<stdexec::__name_of<Sender>>();
78+
}
79+
}
80+
81+
template<stdexec::sender_expr_for<stdexec::bulk_unchunked_t> Sender,classEnv>
82+
staticconstexprautotransform_sender(stdexec::set_value_t, Sender&& sndr,const Env& env);
83+
};
84+
4685
structscheduler {
4786
private:
4887
template<classDerivedPoolType_,classReceiverId>
@@ -62,6 +101,11 @@ namespace execpools {
62101
return pool_.get_scheduler();
63102
}
64103

104+
template<classCPO>
105+
autoquery(stdexec::get_completion_domain_t<CPO>)constnoexcept -> domain {
106+
return {};
107+
}
108+
65109
autoget_env()constnoexcept ->const sender& {
66110
return *this;
67111
}
@@ -111,29 +155,6 @@ namespace execpools {
111155
std::atomic<std::uint32_t> thread_with_exception_{0};
112156
std::exception_ptr exception_;
113157

114-
// Splits `n` into `size` chunks distributing `n % size` evenly between ranks.
115-
// Returns `[begin, end)` range in `n` for a given `rank`.
116-
// Example:
117-
// ```cpp
118-
// // n_items thread n_threads
119-
// even_share( 11, 0, 3); // -> [0, 4) -> 4 items
120-
// even_share( 11, 1, 3); // -> [4, 8) -> 4 items
121-
// even_share( 11, 2, 3); // -> [8, 11) -> 3 items
122-
// ```
123-
staticautoeven_share(Shape n, std::size_t rank, std::size_t size)noexcept
124-
-> std::pair<Shape, Shape> {
125-
constauto avg_per_thread = n / size;
126-
constauto n_big_share = avg_per_thread +1;
127-
constauto big_shares = n % size;
128-
constauto is_big_share = rank < big_shares;
129-
constauto begin = is_big_share
130-
? n_big_share * rank
131-
: n_big_share * big_shares + (rank - big_shares) * avg_per_thread;
132-
constauto end = begin + (is_big_share ? n_big_share : avg_per_thread);
133-
134-
returnstd::make_pair(begin, end);
135-
}
136-
137158
[[nodiscard]]
138159
autonum_agents_required()const -> std::uint32_t {
139160
// With work stealing, is std::min necessary, or can we feel free to ask for more agents (tasks)
@@ -162,10 +183,8 @@ namespace execpools {
162183
auto total_threads = self.num_agents_required();
163184

164185
auto computation = [&](auto&... args) {
165-
auto [begin, end] =even_share(self.shape_, tid, total_threads);
166-
for (Shape i = begin; i < end; ++i) {
167-
self.fun_(i, args...);
168-
}
186+
auto [begin, end] =exec::_pool_::even_share(self.shape_, tid, total_threads);
187+
self.fun_(begin, end, args...);
169188
};
170189

171190
auto completion = [&](auto&... args) {
@@ -376,10 +395,11 @@ namespace execpools {
376395
}
377396

378397
template<stdexec::__forwarding_query Tag,class... As>
379-
requires stdexec::__callable<Tag,const Sender&, As...>
398+
requires stdexec::__queryable_with<stdexec::env_of_t<Sender>, Tag, As...>
380399
autoquery(Tag, As&&... as)const
381-
noexcept(stdexec::__nothrow_callable<Tag,const Sender&, As...>) -> decltype(auto) {
382-
returnTag()(sndr_,static_cast<As&&>(as)...);
400+
noexcept(stdexec::__nothrow_queryable_with<stdexec::env_of_t<Sender>, Tag, As...>)
401+
-> decltype(auto) {
402+
return stdexec::__query<Tag>()(stdexec::get_env(sndr_),static_cast<As&&>(as)...);
383403
}
384404

385405
autoget_env()constnoexcept ->const __t& {
@@ -392,22 +412,6 @@ namespace execpools {
392412
usingbulk_sender_t =
393413
stdexec::__t<bulk_sender<stdexec::__id<stdexec::__decay_t<Sender>>, Shape, Fun>>;
394414

395-
STDEXEC_MEMFN_FRIEND(bulk);
396-
397-
template<stdexec::sender S, std::integral Shape,classFun>
398-
STDEXEC_MEMFN_DECL(
399-
auto bulk)(thisconst scheduler& sch, S&& sndr, Shape shape, Fun fun)noexcept
400-
-> bulk_sender_t<S, Shape, Fun> {
401-
returnbulk_sender_t<S, Shape, Fun>{
402-
*sch.pool_,static_cast<S&&>(sndr), shape,static_cast<Fun&&>(fun)};
403-
}
404-
405-
[[nodiscard]]
406-
constexprauto
407-
forward_progress_guarantee()constnoexcept -> stdexec::forward_progress_guarantee {
408-
return pool_->forward_progress_guarantee();
409-
}
410-
411415
friend thread_pool_base;
412416

413417
explicitscheduler(DerivedPoolType& pool)noexcept
@@ -421,12 +425,32 @@ namespace execpools {
421425
using __id = scheduler;
422426
autooperator==(const scheduler&)const ->bool =default;
423427

428+
424429
[[nodiscard]]
425430
constexprautoquery(stdexec::get_forward_progress_guarantee_t)constnoexcept
426431
-> stdexec::forward_progress_guarantee {
427-
returnforward_progress_guarantee();
432+
return pool_->forward_progress_guarantee();
433+
}
434+
435+
template<stdexec::__one_of<stdexec::set_value_t, stdexec::set_stopped_t> Tag>
436+
[[nodiscard]]
437+
constexprautoquery(stdexec::get_completion_scheduler_t<Tag>)constnoexcept -> scheduler {
438+
return *this;
439+
}
440+
441+
template<stdexec::__one_of<stdexec::set_value_t, stdexec::set_stopped_t> Tag>
442+
[[nodiscard]]
443+
constexprautoquery(stdexec::get_completion_domain_t<Tag>)constnoexcept -> domain {
444+
return {};
428445
}
429446

447+
template<stdexec::__one_of<stdexec::set_value_t, stdexec::set_stopped_t> Tag>
448+
[[nodiscard]]
449+
constexprautoquery(stdexec::get_completion_behavior_t<Tag>)constnoexcept {
450+
return stdexec::completion_behavior::asynchronous;
451+
}
452+
453+
[[nodiscard]]
430454
autoschedule()constnoexcept -> sender {
431455
return sender{*pool_};
432456
}

‎include/stdexec/__detail/__bulk.hpp‎

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -219,34 +219,36 @@ namespace stdexec {
219219
};
220220

221221
structbulk_t :__generic_bulk_t<bulk_t> {
222-
template<class_Env>
223-
staticauto__transform_sender_fn(const _Env&) {
224-
return [&]<class_Data,class_Child>(__ignore, _Data&& __data, _Child&& __child) {
225-
using__shape_t = std::remove_cvref_t<decltype(__data.__shape_)>;
226-
auto __new_f =
227-
[__func =std::move(
228-
__data.__fun_)](__shape_t __begin,__shape_t __end,auto&&... __vs)mutable
222+
private:
223+
template<class_Fun>
224+
staticconstexprauto__mk_chunked_fn(_Fun&& __fun) {
225+
return [__fun_ =static_cast<_Fun&&>(
226+
__fun)]<classShape>(Shape __begin, Shape __end,auto&... __vs)mutable
229227
#if !STDEXEC_MSVC()
230-
// MSVCBUG https://developercommunity.visualstudio.com/t/noexcept-expression-in-lambda-template-n/10718680
231-
noexcept(noexcept(__data.__fun_(__begin++,__vs...)))
228+
// MSVCBUG https://developercommunity.visualstudio.com/t/noexcept-expression-in-lambda-template-n/10718680
229+
noexcept(__nothrow_callable<_Fun&, Shape,decltype(__vs)...>)
232230
#endif
233-
{
234-
while (__begin != __end)
235-
__func(__begin++, __vs...);
236-
};
231+
{
232+
while (__begin != __end)
233+
__fun_(__begin++, __vs...);
234+
};
235+
}
237236

237+
public:
238+
staticauto__transform_sender_fn()noexcept {
239+
return []<class_Data,class_Child>(__ignore, _Data&& __data, _Child&& __child) {
238240
// Lower `bulk` to `bulk_chunked`. If `bulk_chunked` is customized, we will see the customization.
239241
returnbulk_chunked(
240242
static_cast<_Child&&>(__child),
241243
__data.__pol_.__get(),
242244
__data.__shape_,
243-
std::move(__new_f));
245+
__mk_chunked_fn(std::move(__data.__fun_)));
244246
};
245247
}
246248

247-
template<class_Sender,class_Env>
248-
staticautotransform_sender(set_value_t, _Sender&& __sndr,const _Env& __env) {
249-
return__sexpr_apply(static_cast<_Sender&&>(__sndr),__transform_sender_fn(__env));
249+
template<class_Sender>
250+
staticautotransform_sender(set_value_t, _Sender&& __sndr,__ignore) {
251+
return__sexpr_apply(static_cast<_Sender&&>(__sndr),__transform_sender_fn());
250252
}
251253
};
252254

‎test/execpools/test_tbb_thread_pool.cpp‎

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ namespace {
8181

8282
TEST_CASE(
8383
"ex::on works when changing threads with execpools::tbb_thread_pool",
84-
"[adaptors][exec::starts_on]") {
84+
"[tbb_thread_pool]") {
8585
execpools::tbb_thread_pool pool;
8686
auto pool_sched = pool.get_scheduler();
8787
CHECK(
@@ -95,7 +95,7 @@ namespace {
9595
REQUIRE(called);
9696
}
9797

98-
TEST_CASE("more tbb_thread_pool") {
98+
TEST_CASE("more tbb_thread_pool","[tbb_thread_pool]") {
9999

100100
auto compute = [](int x) ->int {
101101
return x +1;
@@ -116,11 +116,11 @@ namespace {
116116
usingnamespacestdexec;
117117

118118
// clang-format off
119-
auto work =when_all(
120-
starts_on(tbb_sched,just(1)) |then(compute) |then(compute),
121-
starts_on(other_sched,just(0)) |then(compute) |continues_on(tbb_sched) |then(compute),
122-
starts_on(inline_sched,just(2)) |then(compute) |continues_on(other_sched) |then(compute) |continues_on(tbb_sched) |then(compute)
123-
);
119+
auto work =when_all(
120+
starts_on(tbb_sched,just(1)) |then(compute) |then(compute),
121+
starts_on(other_sched,just(0)) |then(compute) |continues_on(tbb_sched) |then(compute),
122+
starts_on(inline_sched,just(2)) |then(compute) |continues_on(other_sched) |then(compute) |continues_on(tbb_sched) |then(compute)
123+
);
124124
// clang-format on
125125

126126
// Launch the work and wait for the result:
@@ -131,7 +131,7 @@ namespace {
131131
}
132132

133133
#if !STDEXEC_STD_NO_EXCEPTIONS()
134-
TEST_CASE("tbb_thread_pool exceptions") {
134+
TEST_CASE("tbb_thread_pool exceptions","[tbb_thread_pool]") {
135135
// I know tbb::task_groups do cancellation with exceptions, which leaves them in a not-restartable
136136
// state. We'd better have it act normally here.
137137
usingnamespacestdexec;
@@ -158,7 +158,7 @@ namespace {
158158
}
159159
#endif// !STDEXEC_STD_NO_EXCEPTIONS()
160160

161-
TEST_CASE("tbb_thread_pool async_inclusive_scan") {
161+
TEST_CASE("tbb_thread_pool async_inclusive_scan","[tbb_thread_pool]") {
162162
constauto input = std::array{1.0,2.0, -1.0, -2.0};
163163
std::remove_const_t<decltype(input)> output;
164164
execpools::tbb_thread_pool pool{2};

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp