Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdc8050d

Browse files
committed
simplify the processing logic in worker thread
1 parenta21b2e4 commitdc8050d

File tree

4 files changed

+76
-94
lines changed

4 files changed

+76
-94
lines changed

‎lib/acto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ class actor {
346346
};
347347

348348
public:
349-
virtual~actor() =default;
349+
virtual~actor()noexcept=default;
350350

351351
protected:
352352
inline actor_refcontext()const {

‎lib/runtime.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ object_t* runtime_t::create_actor(std::unique_ptr<actor> body,
330330

331331
++workers_.reserved;
332332

333-
worker->assign(result,std::chrono::steady_clock::duration());
333+
worker->assign(result,std::chrono::milliseconds(500));
334334
}
335335
}
336336

@@ -369,7 +369,7 @@ void runtime_t::execute() {
369369
if (!worker) {
370370
// Если текущее количество потоков меньше оптимального,
371371
// то создать новый поток
372-
if (workers_.count < (workers_.reserved +(m_processors <<1))) {
372+
if (workers_.count < (workers_.reserved + m_processors)) {
373373
worker =create_worker();
374374
}else {
375375
// Подождать некоторое время осовобождения какого-нибудь потока

‎lib/worker.cpp

Lines changed: 56 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ worker_t::worker_t(callbacks* const slots, std::function<void()> init_cb)
1717
worker_t::~worker_t() {
1818
active_ =false;
1919
event_.signaled();
20-
complete_.wait();
2120

2221
if (thread_.joinable()) {
2322
thread_.join();
@@ -30,12 +29,10 @@ void worker_t::assign(object_t* const obj,
3029

3130
object_ = obj;
3231
start_ =std::chrono::steady_clock::now();
33-
time_ = slice;
34-
// Так как поток оперирует объектом, то необходимо захватить
35-
// ссылку на объект, иначе объект может быть удален
36-
// во время обработки сообщений
32+
time_slice_ = slice;
33+
// Acquire the object.
3734
runtime_t::instance()->acquire(obj);
38-
//Активировать поток
35+
//Wakeup the thread.
3936
event_.signaled();
4037
}
4138

@@ -45,101 +42,80 @@ void worker_t::wakeup() {
4542

4643
voidworker_t::execute() {
4744
while (active_) {
48-
//
49-
// Если данному потоку назначен объект, то необходимо
50-
// обрабатывать сообщения, ему поступившие
51-
//
5245
if (!process()) {
5346
slots_->push_idle(this);
5447
}
5548

56-
//
57-
// Ждать, пока не появится новое задание для данного потока
58-
//
59-
event_.wait();// Cond: (m_object != 0) || (m_active == false)
49+
// Wait for new tasks.
50+
event_.wait();// Cond: (object_ != 0) || (active_ == false)
6051
}
61-
62-
complete_.signaled();
63-
}
64-
65-
boolworker_t::check_deleting(object_t*const obj) {
66-
// TN: В контексте рабочего потока
67-
std::lock_guard<std::recursive_mutex>g(obj->cs);
68-
69-
if (obj->has_messages()) {
70-
returnfalse;
71-
}
72-
if (!obj->exclusive || obj->deleting) {
73-
obj->scheduled =false;
74-
object_ =nullptr;
75-
}
76-
// Если текущий объект необходимо удалить
77-
if (obj->deleting) {
78-
returntrue;
79-
}
80-
81-
returnfalse;
8252
}
8353

8454
boolworker_t::process() {
8555
while (object_t*const obj = object_) {
86-
bool released =false;
87-
88-
while (auto msg = obj->select_message()) {
89-
assert(obj->impl);
90-
91-
// Обработать сообщение.
92-
slots_->handle_message(std::move(msg));
93-
94-
// Проверить истечение лимита времени обработки для данного объекта.
95-
if (!obj->exclusive &&
96-
((std::chrono::steady_clock::now() - start_) > time_)) {
97-
std::lock_guard<std::recursive_mutex>g(obj->cs);
98-
99-
if (!obj->deleting) {
100-
assert(obj->impl);
56+
bool need_delete =false;
57+
58+
while (true) {
59+
// Handle a message.
60+
if (auto msg = obj->select_message()) {
61+
slots_->handle_message(std::move(msg));
62+
// Continue processing messages if the object is bound to the thread or
63+
// the time slice has not been elapsed yet.
64+
if (obj->exclusive ||
65+
time_slice_ >= (std::chrono::steady_clock::now() - start_))
66+
{
67+
continue;
68+
}
69+
}
10170

102-
if (obj->has_messages()) {
103-
runtime_t::instance()->release(obj);
71+
// There are no messages in the object's mailbox or
72+
// the time slice was elapsed.
73+
std::lock_guard<std::recursive_mutex>g(obj->cs);
10474

105-
slots_->push_object(obj);
75+
if (obj->deleting) {
76+
need_delete =true;
77+
// Drain the object's mailbox if it in the deleting state.
78+
if (obj->has_messages()) {
79+
time_slice_ =std::chrono::steady_clock::duration::max();
80+
continue;
81+
}
10682

107-
released =true;
108-
}else {
109-
obj->scheduled =false;
110-
}
111-
object_ =nullptr;
112-
break;
83+
obj->scheduled =false;
84+
}elseif (obj->exclusive) {
85+
// Just wait for new messages if the object
86+
// exclusively bound to the thread.
87+
returntrue;
88+
}else {
89+
if (obj->has_messages()) {
90+
// Return object to the shared queue.
91+
slots_->push_object(obj);
92+
}else {
93+
obj->scheduled =false;
11394
}
11495
}
115-
}
11696

117-
if (!released &&check_deleting(obj)) {
118-
slots_->push_delete(obj);
97+
break;
11998
}
12099

121-
//
122-
// Получить новый объект для обработки, если он есть в очереди
123-
//
124-
if (object_) {
125-
if (object_->exclusive) {
126-
returntrue;
100+
if (need_delete) {
101+
if (runtime_t::instance()->release(obj)) {
102+
slots_->push_delete(obj);
127103
}
128104
}else {
129-
// Освободить ссылку на предыдущий объект
130-
if (!released) {
131-
runtime_t::instance()->release(obj);
132-
}
133-
/// Retrieve next object from the queue.
134-
if ((object_ = slots_->pop_object())) {
135-
start_ =std::chrono::steady_clock::now();
136-
runtime_t::instance()->acquire(object_);
137-
}else {
138-
// Поместить текущий поток в список свободных
139-
returnfalse;
140-
}
105+
runtime_t::instance()->release(obj);
106+
}
107+
108+
// Retrieve next object from the shared queue.
109+
if ((object_ = slots_->pop_object())) {
110+
start_ =std::chrono::steady_clock::now();
111+
runtime_t::instance()->acquire(object_);
112+
}else {
113+
// Nothing to do.
114+
// Put itself to the idle list.
115+
returnfalse;
141116
}
142117
}
118+
143119
returntrue;
144120
}
145121

‎lib/worker.h

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,27 @@ struct object_t;
1616
structmsg_t;
1717

1818
/**
19-
*Системный поток
19+
*Worker thread.
2020
*/
2121
classworker_t :publicgenerics::intrusive_t<worker_t> {
2222
public:
2323
classcallbacks {
2424
public:
2525
virtual~callbacks() =default;
2626

27+
/** Process the message.*/
2728
virtualvoidhandle_message(std::unique_ptr<msg_t>) = 0;
29+
30+
/** Schedule object to delete.*/
2831
virtualvoidpush_delete(object_t*const) = 0;
32+
33+
/** Put itself to idle list.*/
2934
virtualvoidpush_idle(worker_t*const) = 0;
35+
36+
/** Return object to shared queue.*/
3037
virtualvoidpush_object(object_t*const) = 0;
38+
39+
/** Try to acquire additional job.*/
3140
virtualobject_t*pop_object() = 0;
3241
};
3342

@@ -36,7 +45,7 @@ class worker_t : public generics::intrusive_t<worker_t> {
3645
~worker_t();
3746

3847
/**
39-
* Assignsthe object to the worker.
48+
* Assignsan object to the worker.
4049
*/
4150
voidassign(object_t*const obj,
4251
const std::chrono::steady_clock::duration slice);
@@ -46,26 +55,23 @@ class worker_t : public generics::intrusive_t<worker_t> {
4655
private:
4756
voidexecute();
4857

49-
///
50-
boolcheck_deleting(object_t*const obj);
51-
52-
///
53-
/// \return true - если есть возможность обработать следующие сообщения
54-
/// false - если сообщений больше нет
58+
/**
59+
* @return true if there are some messages in object's mailbox.
60+
* @return false if there are no more messages in object's mailbox.
61+
*/
5562
boolprocess();
5663

5764
private:
5865
callbacks*const slots_;
59-
///Флаг активности потока
66+
///Activity flag.
6067
std::atomic<bool> active_{true};
6168
/// Current assigned object.
6269
object_t* object_{nullptr};
6370

6471
std::chrono::steady_clock::time_point start_{};
65-
std::chrono::steady_clock::durationtime_{};
72+
std::chrono::steady_clock::durationtime_slice_{};
6673

6774
event_t event_{true};
68-
event_t complete_;
6975
std::thread thread_;
7076
};
7177

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp