1010 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1111 * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
1212 */
13- package org .asynchttpclient .netty . handler ;
13+ package org .asynchttpclient .reactivestreams ;
1414
1515import static org .asynchttpclient .Dsl .asyncHttpClient ;
1616import static org .asynchttpclient .test .TestUtils .LARGE_IMAGE_BYTES ;
2525import java .util .concurrent .CountDownLatch ;
2626import java .util .concurrent .atomic .AtomicReference ;
2727
28+ import org .asynchttpclient .AbstractBasicTest ;
2829import org .asynchttpclient .AsyncHttpClient ;
2930import org .asynchttpclient .HttpResponseBodyPart ;
3031import org .asynchttpclient .handler .AsyncHandlerExtensions ;
3132import org .asynchttpclient .netty .handler .StreamedResponsePublisher ;
3233import org .asynchttpclient .netty .request .NettyRequest ;
33- import org .asynchttpclient .reactivestreams .ReactiveStreamsTest ;
34+ import org .asynchttpclient .reactivestreams .ReactiveStreamsTest .SimpleStreamedAsyncHandler ;
35+ import org .asynchttpclient .reactivestreams .ReactiveStreamsTest .SimpleSubscriber ;
3436import org .reactivestreams .Publisher ;
3537import org .slf4j .Logger ;
3638import org .slf4j .LoggerFactory ;
3739import org .testng .annotations .Test ;
3840
39- public class NettyReactiveStreamsTest extends ReactiveStreamsTest {
41+ public class FailingReactiveStreamsTest extends AbstractBasicTest {
4042
4143@ Test (groups ="standalone" )
4244public void testRetryingOnFailingStream ()throws Exception {
@@ -45,20 +47,17 @@ public void testRetryingOnFailingStream() throws Exception {
4547final CountDownLatch streamOnHold =new CountDownLatch (1 );// allows us to hold the subscriber from processing further body chunks
4648final CountDownLatch replayingRequest =new CountDownLatch (1 );// allows us to block until the request is being replayed ( this is what we want to test here!)
4749
48- // a ref to the publisher is needed to get a hold on the channel (if there is a better way, this should be changed)
50+ // a ref to the publisher is needed to get a hold on the channel (if there is a better way, this should be changed)
4951final AtomicReference <StreamedResponsePublisher >publisherRef =new AtomicReference <>(null );
5052
5153// executing the request
52- client .preparePost (getTargetUrl ())
53- .setBody (LARGE_IMAGE_BYTES )
54- .execute (new ReplayedSimpleAsyncHandler (replayingRequest ,
55- new BlockedStreamSubscriber (streamStarted ,streamOnHold )) {
54+ client .preparePost (getTargetUrl ()).setBody (LARGE_IMAGE_BYTES )
55+ .execute (new ReplayedSimpleAsyncHandler (replayingRequest ,new BlockedStreamSubscriber (streamStarted ,streamOnHold )) {
5656@ Override
5757public State onStream (Publisher <HttpResponseBodyPart >publisher ) {
58- if (!(publisher instanceof StreamedResponsePublisher )) {
58+ if (!(publisher instanceof StreamedResponsePublisher )) {
5959throw new IllegalStateException (String .format ("publisher %s is expected to be an instance of %s" ,publisher ,StreamedResponsePublisher .class ));
60- }
61- else if (!publisherRef .compareAndSet (null , (StreamedResponsePublisher )publisher )) {
60+ }else if (!publisherRef .compareAndSet (null , (StreamedResponsePublisher )publisher )) {
6261// abort on retry
6362return State .ABORT ;
6463 }
@@ -87,7 +86,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
8786// now we expect a new connection to be created and AHC retry logic to kick-in automatically
8887replayingRequest .await ();// wait until we are notified the request is being replayed
8988
90- // Change this if there is a better way of stating the test succeeded
89+ // Change this if there is a better way of stating the test succeeded
9190assertTrue (true );
9291 }
9392 }
@@ -119,40 +118,70 @@ public void onNext(HttpResponseBodyPart t) {
119118super .onNext (t );
120119 }
121120 }
122-
121+
123122private static class ReplayedSimpleAsyncHandler extends SimpleStreamedAsyncHandler implements AsyncHandlerExtensions {
124123private final CountDownLatch replaying ;
124+
125125public ReplayedSimpleAsyncHandler (CountDownLatch replaying ,SimpleSubscriber <HttpResponseBodyPart >subscriber ) {
126126super (subscriber );
127127this .replaying =replaying ;
128128 }
129+
129130@ Override
130- public void onHostnameResolutionAttempt (String name ) {}
131+ public void onHostnameResolutionAttempt (String name ) {
132+ }
133+
131134@ Override
132- public void onHostnameResolutionSuccess (String name ,List <InetSocketAddress >addresses ) {}
135+ public void onHostnameResolutionSuccess (String name ,List <InetSocketAddress >addresses ) {
136+ }
137+
133138@ Override
134- public void onHostnameResolutionFailure (String name ,Throwable cause ) {}
139+ public void onHostnameResolutionFailure (String name ,Throwable cause ) {
140+ }
141+
135142@ Override
136- public void onTcpConnectAttempt (InetSocketAddress address ) {}
143+ public void onTcpConnectAttempt (InetSocketAddress address ) {
144+ }
145+
137146@ Override
138- public void onTcpConnectSuccess (InetSocketAddress address ,Channel connection ) {}
147+ public void onTcpConnectSuccess (InetSocketAddress address ,Channel connection ) {
148+ }
149+
139150@ Override
140- public void onTcpConnectFailure (InetSocketAddress address ,Throwable cause ) {}
151+ public void onTcpConnectFailure (InetSocketAddress address ,Throwable cause ) {
152+ }
153+
141154@ Override
142- public void onTlsHandshakeAttempt () {}
155+ public void onTlsHandshakeAttempt () {
156+ }
157+
143158@ Override
144- public void onTlsHandshakeSuccess () {}
159+ public void onTlsHandshakeSuccess () {
160+ }
161+
145162@ Override
146- public void onTlsHandshakeFailure (Throwable cause ) {}
163+ public void onTlsHandshakeFailure (Throwable cause ) {
164+ }
165+
147166@ Override
148- public void onConnectionPoolAttempt () {}
167+ public void onConnectionPoolAttempt () {
168+ }
169+
149170@ Override
150- public void onConnectionPooled (Channel connection ) {}
171+ public void onConnectionPooled (Channel connection ) {
172+ }
173+
151174@ Override
152- public void onConnectionOffer (Channel connection ) {}
175+ public void onConnectionOffer (Channel connection ) {
176+ }
177+
153178@ Override
154- public void onRequestSend (NettyRequest request ) {}
179+ public void onRequestSend (NettyRequest request ) {
180+ }
181+
155182@ Override
156- public void onRetry () {replaying .countDown (); }
183+ public void onRetry () {
184+ replaying .countDown ();
185+ }
157186 }
158187}