Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Attempt to re-establish websocket connection to Gateway#4777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Merged
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Attempt to re-establish websocket connection to Gateway
When notebook (with `--gateway-url` option) lost the connection toGateway, notebook didn't connect to Gateway again although thewebsocket connection from the client was still alive.This change recovers the connection to Gateway to prevent this anomaly.Signed-off-by: Eunsoo Park <esevan.park@gmail.com>
  • Loading branch information
@esevan
esevan committedJul 23, 2019
commita1b133393813e76d45987f61d96f387c229f686a
26 changes: 18 additions & 8 deletionsnotebook/gateway/handlers.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -130,10 +130,12 @@ def __init__(self, **kwargs):
self.kernel_id = None
self.ws = None
self.ws_future = Future()
self.ws_future_cancelled = False
self.disconnected = False

@gen.coroutine
def _connect(self, kernel_id):
# websocket is initialized before connection
self.ws = None
self.kernel_id = kernel_id
ws_url = url_path_join(
GatewayClient.instance().ws_url,
Expand All@@ -148,40 +150,48 @@ def _connect(self, kernel_id):
self.ws_future.add_done_callback(self._connection_done)

def _connection_done(self, fut):
if not self.ws_future_cancelled: # prevent concurrent.futures._base.CancelledError
if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError
self.ws = fut.result()
self.log.debug("Connection is ready: ws: {}".format(self.ws))
else:
self.log.warning("Websocket connection has beencancelled via client disconnectbefore its establishment. "
self.log.warning("Websocket connection has beenclosed via client disconnector due to error. "
"Kernel with ID '{}' may not be terminated on GatewayClient: {}".
format(self.kernel_id, GatewayClient.instance().url))

def _disconnect(self):
self.disconnected = True
if self.ws is not None:
# Close connection
self.ws.close()
elif not self.ws_future.done():
# Cancel pending connection. Since future.cancel() is a noop on tornado, we'll track cancellation locally
self.ws_future.cancel()
self.ws_future_cancelled = True
self.log.debug("_disconnect: ws_future_cancelled: {}".format(self.ws_future_cancelled))
self.log.debug("_disconnect: future cancelled, disconnected: {}".format(self.disconnected))

@gen.coroutine
def _read_messages(self, callback):
"""Read messages from gateway server."""
whileTrue:
whileself.ws is not None:
message = None
if not self.ws_future_cancelled:
if not self.disconnected:
try:
message = yield self.ws.read_message()
except Exception as e:
self.log.error("Exception reading message from websocket: {}".format(e)) # , exc_info=True)
if message is None:
if not self.disconnected:
self.log.warning("Lost connection to Gateway: {}".format(self.kernel_id))
break
callback(message) # pass back to notebook client (see self.on_open and WebSocketChannelsHandler.open)
else: # ws cancelled - stop reading
break

if not self.disconnected: # if websocket is not disconnected by client, attept to reconnect to Gateway
self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id))
self._connect(self.kernel_id)
loop = IOLoop.current()
loop.add_future(self.ws_future, lambda future: self._read_messages(callback))

def on_open(self, kernel_id, message_callback, **kwargs):
"""Web socket connection open against gateway server."""
self._connect(kernel_id)
Expand All@@ -205,7 +215,7 @@ def on_message(self, message):
def _write_message(self, message):
"""Send message to gateway server."""
try:
if not self.ws_future_cancelled:
if not self.disconnected and self.ws is not None:
self.ws.write_message(message)
except Exception as e:
self.log.error("Exception writing message to websocket: {}".format(e)) # , exc_info=True)
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp