@@ -17,7 +17,6 @@ worker_t::worker_t(callbacks* const slots, std::function<void()> init_cb)
17
17
worker_t ::~worker_t () {
18
18
active_ =false ;
19
19
event_.signaled ();
20
- complete_.wait ();
21
20
22
21
if (thread_.joinable ()) {
23
22
thread_.join ();
@@ -30,12 +29,10 @@ void worker_t::assign(object_t* const obj,
30
29
31
30
object_ = obj;
32
31
start_ =std::chrono::steady_clock::now ();
33
- time_ = slice;
34
- // Так как поток оперирует объектом, то необходимо захватить
35
- // ссылку на объект, иначе объект может быть удален
36
- // во время обработки сообщений
32
+ time_slice_ = slice;
33
+ // Acquire the object.
37
34
runtime_t::instance ()->acquire (obj);
38
- // Активировать поток
35
+ // Wakeup the thread.
39
36
event_.signaled ();
40
37
}
41
38
@@ -45,101 +42,80 @@ void worker_t::wakeup() {
45
42
46
43
void worker_t::execute () {
47
44
while (active_) {
48
- //
49
- // Если данному потоку назначен объект, то необходимо
50
- // обрабатывать сообщения, ему поступившие
51
- //
52
45
if (!process ()) {
53
46
slots_->push_idle (this );
54
47
}
55
48
56
- //
57
- // Ждать, пока не появится новое задание для данного потока
58
- //
59
- event_.wait ();// Cond: (m_object != 0) || (m_active == false)
49
+ // Wait for new tasks.
50
+ event_.wait ();// Cond: (object_ != 0) || (active_ == false)
60
51
}
61
-
62
- complete_.signaled ();
63
- }
64
-
65
- bool worker_t::check_deleting (object_t *const obj) {
66
- // TN: В контексте рабочего потока
67
- std::lock_guard<std::recursive_mutex>g (obj->cs );
68
-
69
- if (obj->has_messages ()) {
70
- return false ;
71
- }
72
- if (!obj->exclusive || obj->deleting ) {
73
- obj->scheduled =false ;
74
- object_ =nullptr ;
75
- }
76
- // Если текущий объект необходимо удалить
77
- if (obj->deleting ) {
78
- return true ;
79
- }
80
-
81
- return false ;
82
52
}
83
53
84
54
bool worker_t::process () {
85
55
while (object_t *const obj = object_) {
86
- bool released =false ;
87
-
88
- while (auto msg = obj->select_message ()) {
89
- assert (obj->impl );
90
-
91
- // Обработать сообщение.
92
- slots_->handle_message (std::move (msg));
93
-
94
- // Проверить истечение лимита времени обработки для данного объекта.
95
- if (!obj->exclusive &&
96
- ((std::chrono::steady_clock::now () - start_) > time_)) {
97
- std::lock_guard<std::recursive_mutex>g (obj->cs );
98
-
99
- if (!obj->deleting ) {
100
- assert (obj->impl );
56
+ bool need_delete =false ;
57
+
58
+ while (true ) {
59
+ // Handle a message.
60
+ if (auto msg = obj->select_message ()) {
61
+ slots_->handle_message (std::move (msg));
62
+ // Continue processing messages if the object is bound to the thread or
63
+ // the time slice has not been elapsed yet.
64
+ if (obj->exclusive ||
65
+ time_slice_ >= (std::chrono::steady_clock::now () - start_))
66
+ {
67
+ continue ;
68
+ }
69
+ }
101
70
102
- if (obj->has_messages ()) {
103
- runtime_t::instance ()->release (obj);
71
+ // There are no messages in the object's mailbox or
72
+ // the time slice was elapsed.
73
+ std::lock_guard<std::recursive_mutex>g (obj->cs );
104
74
105
- slots_->push_object (obj);
75
+ if (obj->deleting ) {
76
+ need_delete =true ;
77
+ // Drain the object's mailbox if it in the deleting state.
78
+ if (obj->has_messages ()) {
79
+ time_slice_ =std::chrono::steady_clock::duration::max ();
80
+ continue ;
81
+ }
106
82
107
- released =true ;
108
- }else {
109
- obj->scheduled =false ;
110
- }
111
- object_ =nullptr ;
112
- break ;
83
+ obj->scheduled =false ;
84
+ }else if (obj->exclusive ) {
85
+ // Just wait for new messages if the object
86
+ // exclusively bound to the thread.
87
+ return true ;
88
+ }else {
89
+ if (obj->has_messages ()) {
90
+ // Return object to the shared queue.
91
+ slots_->push_object (obj);
92
+ }else {
93
+ obj->scheduled =false ;
113
94
}
114
95
}
115
- }
116
96
117
- if (!released &&check_deleting (obj)) {
118
- slots_->push_delete (obj);
97
+ break ;
119
98
}
120
99
121
- //
122
- // Получить новый объект для обработки, если он есть в очереди
123
- //
124
- if (object_) {
125
- if (object_->exclusive ) {
126
- return true ;
100
+ if (need_delete) {
101
+ if (runtime_t::instance ()->release (obj)) {
102
+ slots_->push_delete (obj);
127
103
}
128
104
}else {
129
- // Освободить ссылку на предыдущий объект
130
- if (!released) {
131
- runtime_t::instance ()->release (obj);
132
- }
133
- // / Retrieve next object from the queue.
134
- if ((object_ = slots_->pop_object ())) {
135
- start_ =std::chrono::steady_clock::now ();
136
- runtime_t::instance ()->acquire (object_);
137
- }else {
138
- // Поместить текущий поток в список свободных
139
- return false ;
140
- }
105
+ runtime_t::instance ()->release (obj);
106
+ }
107
+
108
+ // Retrieve next object from the shared queue.
109
+ if ((object_ = slots_->pop_object ())) {
110
+ start_ =std::chrono::steady_clock::now ();
111
+ runtime_t::instance ()->acquire (object_);
112
+ }else {
113
+ // Nothing to do.
114
+ // Put itself to the idle list.
115
+ return false ;
141
116
}
142
117
}
118
+
143
119
return true ;
144
120
}
145
121