Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit077b7e3

Browse files
authored
Merge pull requestalexmojaki#12 from alexmojaki/worker-server
Convert master to flask server instead of using rabbitmq
2 parents35ceffc +0d59b69 commit077b7e3

File tree

9 files changed

+77
-241
lines changed

9 files changed

+77
-241
lines changed

‎app.json‎

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,11 @@
55
"DEBUG":"False"
66
},
77
"addons": [
8-
"cloudamqp",
98
"heroku-postgresql"
109
],
1110
"formation": {
1211
"web": {
1312
"quantity":1
14-
},
15-
"worker": {
16-
"quantity":1
1713
}
1814
},
1915
"stack":"container"

‎backend/entrypoint.sh‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ set -ex
33

44
#nginx
55

6-
#rabbitmq-server -detached
7-
86
# We probably don't want this to be automatic but it makes life a lot easier
97
# For setting up the cloud
108
python manage.py migrate
119
python manage.py init_db
1210

13-
#python -m main.worker &
11+
# Prevent outdated from making http requests
12+
echo'["0.2.0", "2099-01-01 00:00:00"]'> /tmp/outdated_cache_outdated
13+
echo'["0.8.3", "2099-01-01 00:00:00"]'> /tmp/outdated_cache_birdseye
14+
15+
gunicorn --bind 127.0.0.1:5000 main.workers.master:app --access-logfile - --error-log - --threads 10 --worker-class gthread&
1416

1517
gunicorn -c gunicorn_config.py book.wsgi:application --bind 0.0.0.0:${PORT:-3000}

‎backend/main/workers/communications.py‎

Lines changed: 0 additions & 41 deletions
This file was deleted.

‎backend/main/workers/master.py‎

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
importatexit
2+
importmultiprocessing
23
importqueue
34
fromcollectionsimportdefaultdict
45
fromfunctoolsimportlru_cache
56
frommultiprocessingimportQueue,Process
67
fromthreadingimportThread
8+
fromtimeimportsleep
9+
10+
importflask
711

812
frommainimportsimple_settings
9-
frommain.workers.communicationsimportAbstractCommunications,ThreadCommunications
1013
frommain.workers.utilsimportinternal_error_result,make_result
1114
frommain.workers.workerimportworker_loop_in_thread
1215

@@ -48,23 +51,23 @@ def handle_entry(self, entry):
4851

4952
self.task_queue.put(entry)
5053

51-
defawait_result(self,callback):
52-
try:
53-
result=self._await_result()
54-
# if result["error"] and result["error"]["sentry_event"]:
55-
# event, hint = result["error"]["sentry_event"]
56-
# capture_event(event, hint)
57-
exceptException:
58-
result=internal_error_result()
54+
defawait_result(self):
55+
result=self._await_result()
56+
# if result["error"] and result["error"]["sentry_event"]:
57+
# event, hint = result["error"]["sentry_event"]
58+
# capture_event(event, hint)
5959
self.awaiting_input=result["awaiting_input"]
60-
callback(result)
60+
returnresult
6161

6262
def_await_result(self):
6363
# TODO cancel if result was cancelled by a newer handle_entry
6464
result=None
65+
# TODO handle initial timeout better
66+
timeout=10
6567
whileresultisNone:
6668
try:
67-
result=self.result_queue.get(timeout=3)
69+
result=self.result_queue.get(timeout=timeout)
70+
timeout=3
6871
exceptqueue.Empty:
6972
alive=self.process.is_alive()
7073
print(f"Process{alive=}")
@@ -82,59 +85,63 @@ def _await_result(self):
8285
returnresult
8386

8487

85-
defmaster_consumer_loop(comms:AbstractCommunications):
86-
comms=comms.make_master_side_communications()
87-
user_processes=defaultdict(UserProcess)
88+
user_processes=defaultdict(UserProcess)
89+
90+
app=flask.Flask(__name__)
91+
92+
multiprocessing.set_start_method("spawn")
93+
94+
95+
@app.route("/run",methods=["POST"])
96+
defrun():
97+
try:
98+
entry=flask.request.json
99+
user_process=user_processes[entry["user_id"]]
100+
user_process.handle_entry(entry)
101+
returnuser_process.await_result()
102+
exceptException:
103+
returninternal_error_result()
104+
88105

89-
whileTrue:
90-
entry=comms.recv_entry()
91-
user_id=str(entry["user_id"])
106+
@app.route("/health")
107+
defhealth():
108+
return"ok"
92109

93-
defcallback(result):
94-
comms.send_result(user_id,result)
95110

96-
try:
97-
user_process=user_processes[user_id]
98-
user_process.handle_entry(entry)
99-
Thread(
100-
target=user_process.await_result,
101-
args=[callback],
102-
).start()
103-
exceptException:
104-
callback(internal_error_result())
111+
defrun_server():
112+
app.run(host="0.0.0.0")
113+
114+
115+
master_url="http://localhost:5000/"
105116

106117

107118
@lru_cache()
108-
defmaster_communications()->AbstractCommunications:
109-
ifsimple_settings.CLOUDAMQP_URL:
110-
from .pikaimportPikaCommunications
111-
comms=PikaCommunications()
112-
else:
113-
comms=ThreadCommunications()
119+
defmaster_session():
120+
importrequests
121+
session=requests.Session()
114122

115123
ifnotsimple_settings.SEPARATE_WORKER_PROCESS:
116124
Thread(
117-
target=master_consumer_loop,
118-
args=[comms],
125+
target=run_server,
119126
daemon=True,
120-
name=master_consumer_loop.__name__,
127+
name=run_server.__name__,
121128
).start()
122129

123-
returncomms
124-
130+
# Wait until alive
131+
whileTrue:
132+
try:
133+
session.get(master_url+"health")
134+
break
135+
exceptrequests.exceptions.ConnectionError:
136+
sleep(1)
125137

126-
defworker_result(entry):
127-
comms:AbstractCommunications=master_communications()
128-
comms.send_entry(entry)
129-
user_id=str(entry["user_id"])
130-
returncomms.recv_result(user_id)
138+
returnsession
131139

132140

133-
defmain():
134-
frommain.workers.pikaimportPikaCommunications
135-
comms=PikaCommunications()
136-
master_consumer_loop(comms)
141+
defworker_result(entry):
142+
session=master_session()
143+
returnsession.post(master_url+"run",json=entry).json()
137144

138145

139146
if__name__=='__main__':
140-
main()
147+
run_server()

‎backend/main/workers/pika.py‎

Lines changed: 0 additions & 98 deletions
This file was deleted.

‎docker-compose.yml‎

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,18 @@ services:
1212
env_file:.env
1313
environment:
1414
SEPARATE_WORKER_PROCESS:'True'
15-
CLOUDAMQP_URL:amqp://rabbitmq
1615
DATABASE_URL:postgres://postgres:pass@database/postgres
1716
stdin_open:true
1817
tty:true
19-
depends_on:
20-
-rabbitmq
21-
worker:
22-
image:python-init
23-
build:.
24-
env_file:.env
25-
stdin_open:true
26-
tty:true
27-
entrypoint:python -m main.workers.master
28-
environment:
29-
SEPARATE_WORKER_PROCESS:'True'
30-
CLOUDAMQP_URL:amqp://rabbitmq
31-
depends_on:
32-
-rabbitmq
33-
rabbitmq:
34-
image:rabbitmq
35-
ports:
36-
-5672:5672
37-
-15672:15672
18+
# worker:
19+
# image: python-init
20+
# build: .
21+
# env_file: .env
22+
# stdin_open: true
23+
# tty: true
24+
# entrypoint: gunicorn --threads 10 --bind 0.0.0.0:5000 main.workers.master:app
25+
# ports:
26+
# - 5000:5000
3827
database:
3928
image:postgres:12
4029
volumes:

‎heroku.yml‎

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
11
build:
22
docker:
33
web:Dockerfile
4-
worker:Dockerfile
5-
run:
6-
worker:python -m main.workers.master

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp