@@ -42,7 +42,46 @@ namespace execpools {
4242template <class DerivedPoolType_ ,class ReceiverId >
4343friend struct operation ;
4444
45+ template <class >
46+ struct not_a_sender {
47+ using sender_concept = stdexec::sender_t ;
48+ };
49+
4550public:
51+ struct scheduler ;
52+
53+ struct domain : stdexec::default_domain {
54+ using __t = domain;
55+ using __id = domain;
56+
57+ template <stdexec::sender_expr_for<stdexec::bulk_chunked_t > Sender,class Env >
58+ static constexpr auto transform_sender (stdexec::set_value_t , Sender&& sndr,const Env& env) {
59+ auto & [tag, data, child] = sndr;
60+ auto & [pol, shape, fun] = data;
61+
62+ if constexpr (stdexec::__completes_on<decltype (child), scheduler, Env>) {
63+ auto sch =
64+ stdexec::get_completion_scheduler<stdexec::set_value_t >(stdexec::get_env (child), env);
65+ using sender_t =
66+ scheduler::template bulk_sender_t <decltype (child),decltype (shape),decltype (fun)>;
67+ return sender_t {
68+ *sch.pool_ ,
69+ stdexec::__forward_like<Sender>(child),
70+ shape,
71+ stdexec::__forward_like<Sender>(fun)};
72+ }else {
73+ static_assert (
74+ stdexec::__completes_on<Sender, scheduler, Env>,
75+ " Unable to dispatch bulk work to the static_thread_pool. The predecessor sender"
76+ " is not able to provide a static_thread_pool scheduler." );
77+ return not_a_sender<stdexec::__name_of<Sender>>();
78+ }
79+ }
80+
81+ template <stdexec::sender_expr_for<stdexec::bulk_unchunked_t > Sender,class Env >
82+ static constexpr auto transform_sender (stdexec::set_value_t , Sender&& sndr,const Env& env);
83+ };
84+
4685struct scheduler {
4786private:
4887template <class DerivedPoolType_ ,class ReceiverId >
@@ -62,6 +101,11 @@ namespace execpools {
62101return pool_.get_scheduler ();
63102 }
64103
104+ template <class CPO >
105+ auto query (stdexec::get_completion_domain_t <CPO>)const noexcept -> domain {
106+ return {};
107+ }
108+
65109auto get_env ()const noexcept ->const sender& {
66110return *this ;
67111 }
@@ -111,29 +155,6 @@ namespace execpools {
111155 std::atomic<std::uint32_t > thread_with_exception_{0 };
112156 std::exception_ptr exception_;
113157
114- // Splits `n` into `size` chunks distributing `n % size` evenly between ranks.
115- // Returns `[begin, end)` range in `n` for a given `rank`.
116- // Example:
117- // ```cpp
118- // // n_items thread n_threads
119- // even_share( 11, 0, 3); // -> [0, 4) -> 4 items
120- // even_share( 11, 1, 3); // -> [4, 8) -> 4 items
121- // even_share( 11, 2, 3); // -> [8, 11) -> 3 items
122- // ```
123- static auto even_share (Shape n, std::size_t rank, std::size_t size)noexcept
124- -> std::pair<Shape, Shape> {
125- const auto avg_per_thread = n / size;
126- const auto n_big_share = avg_per_thread +1 ;
127- const auto big_shares = n % size;
128- const auto is_big_share = rank < big_shares;
129- const auto begin = is_big_share
130- ? n_big_share * rank
131- : n_big_share * big_shares + (rank - big_shares) * avg_per_thread;
132- const auto end = begin + (is_big_share ? n_big_share : avg_per_thread);
133-
134- return std::make_pair (begin, end);
135- }
136-
137158 [[nodiscard]]
138159auto num_agents_required ()const -> std::uint32_t {
139160// With work stealing, is std::min necessary, or can we feel free to ask for more agents (tasks)
@@ -162,10 +183,8 @@ namespace execpools {
162183auto total_threads = self.num_agents_required ();
163184
164185auto computation = [&](auto &... args) {
165- auto [begin, end] =even_share (self.shape_ , tid, total_threads);
166- for (Shape i = begin; i < end; ++i) {
167- self.fun_ (i, args...);
168- }
186+ auto [begin, end] =exec::_pool_::even_share (self.shape_ , tid, total_threads);
187+ self.fun_ (begin, end, args...);
169188 };
170189
171190auto completion = [&](auto &... args) {
@@ -376,10 +395,11 @@ namespace execpools {
376395 }
377396
378397template <stdexec::__forwarding_query Tag,class ... As>
379- requires stdexec::__callable<Tag, const Sender& , As...>
398+ requires stdexec::__queryable_with<stdexec:: env_of_t <Sender>, Tag , As...>
380399auto query (Tag, As&&... as)const
381- noexcept (stdexec::__nothrow_callable<Tag,const Sender&, As...>) -> decltype(auto ) {
382- return Tag ()(sndr_,static_cast <As&&>(as)...);
400+ noexcept (stdexec::__nothrow_queryable_with<stdexec::env_of_t <Sender>, Tag, As...>)
401+ -> decltype(auto ) {
402+ return stdexec::__query<Tag>()(stdexec::get_env (sndr_),static_cast <As&&>(as)...);
383403 }
384404
385405auto get_env ()const noexcept ->const __t& {
@@ -392,22 +412,6 @@ namespace execpools {
392412using bulk_sender_t =
393413 stdexec::__t <bulk_sender<stdexec::__id<stdexec::__decay_t <Sender>>, Shape, Fun>>;
394414
395- STDEXEC_MEMFN_FRIEND (bulk);
396-
397- template <stdexec::sender S, std::integral Shape,class Fun >
398- STDEXEC_MEMFN_DECL (
399- auto bulk)(this const scheduler& sch, S&& sndr, Shape shape, Fun fun)noexcept
400- -> bulk_sender_t<S, Shape, Fun> {
401- return bulk_sender_t <S, Shape, Fun>{
402- *sch.pool_ ,static_cast <S&&>(sndr), shape,static_cast <Fun&&>(fun)};
403- }
404-
405- [[nodiscard]]
406- constexpr auto
407- forward_progress_guarantee ()const noexcept -> stdexec::forward_progress_guarantee {
408- return pool_->forward_progress_guarantee ();
409- }
410-
411415friend thread_pool_base;
412416
413417explicit scheduler (DerivedPoolType& pool)noexcept
@@ -421,12 +425,32 @@ namespace execpools {
421425using __id = scheduler;
422426auto operator ==(const scheduler&)const ->bool =default ;
423427
428+
424429 [[nodiscard]]
425430constexpr auto query (stdexec::get_forward_progress_guarantee_t )const noexcept
426431 -> stdexec::forward_progress_guarantee {
427- return forward_progress_guarantee ();
432+ return pool_->forward_progress_guarantee ();
433+ }
434+
435+ template <stdexec::__one_of<stdexec::set_value_t , stdexec::set_stopped_t > Tag>
436+ [[nodiscard]]
437+ constexpr auto query (stdexec::get_completion_scheduler_t <Tag>)const noexcept -> scheduler {
438+ return *this ;
439+ }
440+
441+ template <stdexec::__one_of<stdexec::set_value_t , stdexec::set_stopped_t > Tag>
442+ [[nodiscard]]
443+ constexpr auto query (stdexec::get_completion_domain_t <Tag>)const noexcept -> domain {
444+ return {};
428445 }
429446
447+ template <stdexec::__one_of<stdexec::set_value_t , stdexec::set_stopped_t > Tag>
448+ [[nodiscard]]
449+ constexpr auto query (stdexec::get_completion_behavior_t <Tag>)const noexcept {
450+ return stdexec::completion_behavior::asynchronous;
451+ }
452+
453+ [[nodiscard]]
430454auto schedule ()const noexcept -> sender {
431455return sender{*pool_};
432456 }