Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit821a0f7

Browse files
committed
Don't defer WebSocket opening,closeAsyncHttpClient#1348
Motivation:We currently buffer WebSocket opening until first LastHttpContentreception with the UpgradeCallback.This doesn't make sense, and forces us to buffer any frame that mightbe sent along with the upgrade response.Modifications:* Drop UpgradeHandler that's never used as an abstraction* Perform upgrade/abort as soon as response is received* Ignore LastHttpContent* No need to buffer any frameResult:More simple code
1 parentbff695e commit821a0f7

File tree

4 files changed

+62
-173
lines changed

4 files changed

+62
-173
lines changed

‎client/src/main/java/org/asynchttpclient/netty/handler/WebSocketHandler.java‎

Lines changed: 43 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
importio.netty.handler.codec.http.HttpHeaderValues;
2222
importio.netty.handler.codec.http.HttpRequest;
2323
importio.netty.handler.codec.http.HttpResponse;
24+
importio.netty.handler.codec.http.LastHttpContent;
2425
importio.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
2526
importio.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
2627
importio.netty.handler.codec.http.websocketx.PingWebSocketFrame;
@@ -36,7 +37,6 @@
3637
importorg.asynchttpclient.HttpResponseStatus;
3738
importorg.asynchttpclient.netty.NettyResponseFuture;
3839
importorg.asynchttpclient.netty.NettyResponseStatus;
39-
importorg.asynchttpclient.netty.OnLastHttpContentCallback;
4040
importorg.asynchttpclient.netty.channel.ChannelManager;
4141
importorg.asynchttpclient.netty.channel.Channels;
4242
importorg.asynchttpclient.netty.request.NettyRequestSender;
@@ -52,71 +52,45 @@ public WebSocketHandler(AsyncHttpClientConfig config,//
5252
super(config,channelManager,requestSender);
5353
}
5454

55-
privateclassUpgradeCallbackextendsOnLastHttpContentCallback {
56-
57-
privatefinalChannelchannel;
58-
privatefinalHttpResponseresponse;
59-
privatefinalWebSocketUpgradeHandlerhandler;
60-
privatefinalHttpResponseStatusstatus;
61-
privatefinalHttpResponseHeadersresponseHeaders;
62-
63-
publicUpgradeCallback(NettyResponseFuture<?>future,Channelchannel,HttpResponseresponse,WebSocketUpgradeHandlerhandler,HttpResponseStatusstatus,
64-
HttpResponseHeadersresponseHeaders) {
65-
super(future);
66-
this.channel =channel;
67-
this.response =response;
68-
this.handler =handler;
69-
this.status =status;
70-
this.responseHeaders =responseHeaders;
55+
privatevoidupgrade(Channelchannel,NettyResponseFuture<?>future,WebSocketUpgradeHandlerhandler,HttpResponseresponse,HttpResponseHeadersresponseHeaders)
56+
throwsException {
57+
booleanvalidStatus =response.status().equals(SWITCHING_PROTOCOLS);
58+
booleanvalidUpgrade =response.headers().get(UPGRADE) !=null;
59+
Stringconnection =response.headers().get(CONNECTION);
60+
booleanvalidConnection =HttpHeaderValues.UPGRADE.contentEqualsIgnoreCase(connection);
61+
finalbooleanheaderOK =handler.onHeadersReceived(responseHeaders) ==State.CONTINUE;
62+
if (!headerOK || !validStatus || !validUpgrade || !validConnection) {
63+
requestSender.abort(channel,future,newIOException("Invalid handshake response"));
64+
return;
7165
}
7266

73-
// We don't need to synchronize as replacing the "ws-decoder" will
74-
// process using the same thread.
75-
privatevoidinvokeOnSucces(Channelchannel,WebSocketUpgradeHandlerh) {
76-
try {
77-
h.onSuccess(newNettyWebSocket(channel,responseHeaders.getHeaders()));
78-
}catch (Exceptionex) {
79-
logger.warn("onSuccess unexpected exception",ex);
80-
}
67+
Stringaccept =response.headers().get(SEC_WEBSOCKET_ACCEPT);
68+
Stringkey =getAcceptKey(future.getNettyRequest().getHttpRequest().headers().get(SEC_WEBSOCKET_KEY));
69+
if (accept ==null || !accept.equals(key)) {
70+
requestSender.abort(channel,future,newIOException("Invalid challenge. Actual: " +accept +". Expected: " +key));
8171
}
8272

83-
@Override
84-
publicvoidcall()throwsException {
85-
booleanvalidStatus =response.status().equals(SWITCHING_PROTOCOLS);
86-
booleanvalidUpgrade =response.headers().get(UPGRADE) !=null;
87-
Stringconnection =response.headers().get(CONNECTION);
88-
booleanvalidConnection =HttpHeaderValues.UPGRADE.contentEqualsIgnoreCase(connection);
89-
booleanstatusReceived =handler.onStatusReceived(status) ==State.CONTINUE;
90-
91-
if (!statusReceived) {
92-
try {
93-
handler.onCompleted();
94-
}finally {
95-
future.done();
96-
}
97-
return;
98-
}
99-
100-
finalbooleanheaderOK =handler.onHeadersReceived(responseHeaders) ==State.CONTINUE;
101-
if (!headerOK || !validStatus || !validUpgrade || !validConnection) {
102-
requestSender.abort(channel,future,newIOException("Invalid handshake response"));
103-
return;
104-
}
73+
// set back the future so the protocol gets notified of frames
74+
// removing the HttpClientCodec from the pipeline might trigger a read with a WebSocket message
75+
// if it comes in the same frame as the HTTP Upgrade response
76+
Channels.setAttribute(channel,future);
10577

106-
Stringaccept =response.headers().get(SEC_WEBSOCKET_ACCEPT);
107-
Stringkey =getAcceptKey(future.getNettyRequest().getHttpRequest().headers().get(SEC_WEBSOCKET_KEY));
108-
if (accept ==null || !accept.equals(key)) {
109-
requestSender.abort(channel,future,newIOException(String.format("Invalid challenge. Actual: %s. Expected: %s",accept,key)));
110-
}
78+
channelManager.upgradePipelineForWebSockets(channel.pipeline());
11179

112-
// set back the future so the protocol gets notified of frames
113-
// removing the HttpClientCodec from the pipeline might trigger a read with a WebSocket message
114-
// if it comes in the same frame as the HTTP Upgrade response
115-
Channels.setAttribute(channel,future);
116-
117-
channelManager.upgradePipelineForWebSockets(channel.pipeline());
80+
// We don't need to synchronize as replacing the "ws-decoder" will
81+
// process using the same thread.
82+
try {
83+
handler.openWebSocket(newNettyWebSocket(channel,responseHeaders.getHeaders()));
84+
}catch (Exceptionex) {
85+
logger.warn("onSuccess unexpected exception",ex);
86+
}
87+
future.done();
88+
}
11889

119-
invokeOnSucces(channel,handler);
90+
privatevoidabort(NettyResponseFuture<?>future,WebSocketUpgradeHandlerhandler,HttpResponseStatusstatus)throwsException {
91+
try {
92+
handler.onThrowable(newIOException("Invalid Status code=" +status.getStatusCode() +" text=" +status.getStatusText()));
93+
}finally {
12094
future.done();
12195
}
12296
}
@@ -136,36 +110,23 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
136110
HttpResponseHeadersresponseHeaders =newHttpResponseHeaders(response.headers());
137111

138112
if (!interceptors.exitAfterIntercept(channel,future,handler,response,status,responseHeaders)) {
139-
Channels.setAttribute(channel,newUpgradeCallback(future,channel,response,handler,status,responseHeaders));
113+
switch (handler.onStatusReceived(status)) {
114+
caseCONTINUE:
115+
upgrade(channel,future,handler,response,responseHeaders);
116+
break;
117+
default:
118+
abort(future,handler,status);
119+
}
140120
}
141121

142122
}elseif (einstanceofWebSocketFrame) {
143123
finalWebSocketFrameframe = (WebSocketFrame)e;
144124
WebSocketUpgradeHandlerhandler = (WebSocketUpgradeHandler)future.getAsyncHandler();
145125
NettyWebSocketwebSocket = (NettyWebSocket)handler.onCompleted();
126+
handleFrame(channel,frame,handler,webSocket);
146127

147-
if (webSocket !=null) {
148-
handleFrame(channel,frame,handler,webSocket);
149-
}else {
150-
logger.debug("Frame received but WebSocket is not available yet, buffering frame");
151-
frame.retain();
152-
RunnablebufferedFrame =newRunnable() {
153-
publicvoidrun() {
154-
try {
155-
// WebSocket is now not null
156-
NettyWebSocketwebSocket = (NettyWebSocket)handler.onCompleted();
157-
handleFrame(channel,frame,handler,webSocket);
158-
}catch (Exceptione) {
159-
logger.debug("Failure while handling buffered frame",e);
160-
handler.onFailure(e);
161-
}finally {
162-
frame.release();
163-
}
164-
}
165-
};
166-
handler.bufferFrame(bufferedFrame);
167-
}
168-
}else {
128+
}elseif (!(einstanceofLastHttpContent)) {
129+
// ignore, end of handshake response
169130
logger.error("Invalid message {}",e);
170131
}
171132
}
@@ -197,7 +158,6 @@ public void handleException(NettyResponseFuture<?> future, Throwable e) {
197158

198159
try {
199160
WebSocketUpgradeHandlerh = (WebSocketUpgradeHandler)future.getAsyncHandler();
200-
201161
NettyWebSocketwebSocket =NettyWebSocket.class.cast(h.onCompleted());
202162
if (webSocket !=null) {
203163
webSocket.onError(e.getCause());

‎client/src/main/java/org/asynchttpclient/ws/UpgradeHandler.java‎

Lines changed: 0 additions & 37 deletions
This file was deleted.

‎client/src/main/java/org/asynchttpclient/ws/WebSocketUpgradeHandler.java‎

Lines changed: 16 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/*
2-
* Copyright (c)2010-2012 Sonatype, Inc. All rights reserved.
2+
* Copyright (c)2017 AsyncHttpClient Project. All rights reserved.
33
*
44
* This program is licensed to you under the Apache License Version 2.0,
55
* and you may not use this file except in compliance with the Apache License Version 2.0.
6-
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
78
*
89
* Unless required by applicable law or agreed to in writing,
910
* software distributed under the Apache License Version 2.0 is distributed on an
@@ -12,11 +13,8 @@
1213
*/
1314
packageorg.asynchttpclient.ws;
1415

15-
importstaticorg.asynchttpclient.util.MiscUtils.isNonEmpty;
16-
1716
importjava.util.ArrayList;
1817
importjava.util.List;
19-
importjava.util.concurrent.atomic.AtomicBoolean;
2018

2119
importorg.asynchttpclient.AsyncHandler;
2220
importorg.asynchttpclient.HttpResponseBodyPart;
@@ -26,85 +24,52 @@
2624
/**
2725
* An {@link AsyncHandler} which is able to execute WebSocket upgrade. Use the Builder for configuring WebSocket options.
2826
*/
29-
publicclassWebSocketUpgradeHandlerimplementsUpgradeHandler<WebSocket>,AsyncHandler<WebSocket> {
27+
publicclassWebSocketUpgradeHandlerimplementsAsyncHandler<WebSocket> {
3028

3129
privatestaticfinalintSWITCHING_PROTOCOLS =io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS.code();
3230

3331
privateWebSocketwebSocket;
3432
privatefinalList<WebSocketListener>listeners;
35-
privatefinalAtomicBooleanok =newAtomicBoolean(false);
36-
privateintstatus;
37-
privateList<Runnable>bufferedFrames;
3833

3934
publicWebSocketUpgradeHandler(List<WebSocketListener>listeners) {
4035
this.listeners =listeners;
4136
}
4237

43-
publicvoidbufferFrame(RunnablebufferedFrame) {
44-
if (bufferedFrames ==null) {
45-
bufferedFrames =newArrayList<>(1);
46-
}
47-
bufferedFrames.add(bufferedFrame);
48-
}
49-
5038
@Override
51-
publicfinalvoidonThrowable(Throwablet) {
52-
onFailure(t);
39+
publicfinalStateonStatusReceived(HttpResponseStatusresponseStatus)throwsException {
40+
returnresponseStatus.getStatusCode() ==SWITCHING_PROTOCOLS ?State.CONTINUE :State.ABORT;
5341
}
5442

55-
5643
@Override
57-
publicfinalStateonBodyPartReceived(HttpResponseBodyPartbodyPart)throwsException {
44+
publicfinalStateonHeadersReceived(HttpResponseHeadersheaders)throwsException {
5845
returnState.CONTINUE;
5946
}
6047

6148
@Override
62-
publicfinalStateonStatusReceived(HttpResponseStatusresponseStatus)throwsException {
63-
status =responseStatus.getStatusCode();
64-
returnstatus ==SWITCHING_PROTOCOLS ?State.CONTINUE :State.ABORT;
65-
}
66-
67-
@Override
68-
publicfinalStateonHeadersReceived(HttpResponseHeadersheaders)throwsException {
49+
publicfinalStateonBodyPartReceived(HttpResponseBodyPartbodyPart)throwsException {
6950
returnState.CONTINUE;
7051
}
7152

7253
@Override
7354
publicfinalWebSocketonCompleted()throwsException {
74-
if (status !=SWITCHING_PROTOCOLS) {
75-
IllegalStateExceptione =newIllegalStateException("Invalid Status Code " +status);
76-
for (WebSocketListenerlistener :listeners) {
77-
listener.onError(e);
78-
}
79-
throwe;
80-
}
81-
8255
returnwebSocket;
8356
}
8457

8558
@Override
86-
publicfinalvoidonSuccess(WebSocketwebSocket) {
87-
this.webSocket =webSocket;
59+
publicfinalvoidonThrowable(Throwablet) {
8860
for (WebSocketListenerlistener :listeners) {
89-
webSocket.addWebSocketListener(listener);
90-
listener.onOpen(webSocket);
91-
}
92-
if (isNonEmpty(bufferedFrames)) {
93-
for (RunnablebufferedFrame :bufferedFrames) {
94-
bufferedFrame.run();
61+
if (webSocket !=null) {
62+
webSocket.addWebSocketListener(listener);
9563
}
96-
bufferedFrames =null;
64+
listener.onError(t);
9765
}
98-
ok.set(true);
9966
}
10067

101-
@Override
102-
publicfinalvoidonFailure(Throwablet) {
68+
publicfinalvoidopenWebSocket(WebSocketwebSocket) {
69+
this.webSocket =webSocket;
10370
for (WebSocketListenerlistener :listeners) {
104-
if (!ok.get() &&webSocket !=null) {
105-
webSocket.addWebSocketListener(listener);
106-
}
107-
listener.onError(t);
71+
webSocket.addWebSocketListener(listener);
72+
listener.onOpen(webSocket);
10873
}
10974
}
11075

‎client/src/test/java/org/asynchttpclient/ws/CloseCodeReasonMessageTest.java‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
*/
1313
packageorg.asynchttpclient.ws;
1414

15-
importstaticorg.asynchttpclient.Dsl.*;
15+
importstaticorg.asynchttpclient.Dsl.asyncHttpClient;
1616
importstaticorg.testng.Assert.*;
1717

18+
importjava.io.IOException;
1819
importjava.util.concurrent.CountDownLatch;
1920
importjava.util.concurrent.ExecutionException;
2021
importjava.util.concurrent.atomic.AtomicReference;
@@ -156,7 +157,7 @@ public void onError(Throwable t) {
156157
}
157158
}
158159

159-
@Test(groups ="online",timeOut =60000,expectedExceptions =IllegalStateException.class)
160+
@Test(groups ="online",timeOut =60000,expectedExceptions =IOException.class)
160161
publicvoidwrongProtocolCode()throwsThrowable {
161162
try (AsyncHttpClientc =asyncHttpClient()) {
162163
finalCountDownLatchlatch =newCountDownLatch(1);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp