1616import static org .asynchttpclient .Dsl .*;
1717import static org .asynchttpclient .test .TestUtils .LARGE_IMAGE_BYTES ;
1818import static org .testng .Assert .assertEquals ;
19+ import io .netty .buffer .ByteBuf ;
20+ import io .netty .buffer .Unpooled ;
1921import io .netty .handler .codec .http .HttpHeaders ;
2022
2123import java .io .ByteArrayOutputStream ;
22- import java .nio .ByteBuffer ;
24+ import java .io .File ;
25+ import java .io .IOException ;
2326import java .util .ArrayList ;
2427import java .util .Collections ;
28+ import java .util .Enumeration ;
2529import java .util .Iterator ;
2630import java .util .List ;
2731import java .util .concurrent .CountDownLatch ;
2832import java .util .concurrent .ExecutionException ;
2933
30- import org .asynchttpclient .AbstractBasicTest ;
34+ import javax .servlet .ServletException ;
35+ import javax .servlet .http .Cookie ;
36+ import javax .servlet .http .HttpServlet ;
37+ import javax .servlet .http .HttpServletRequest ;
38+ import javax .servlet .http .HttpServletResponse ;
39+
40+ import org .apache .catalina .Context ;
41+ import org .apache .catalina .startup .Tomcat ;
42+ import org .apache .commons .io .IOUtils ;
3143import org .asynchttpclient .AsyncHttpClient ;
3244import org .asynchttpclient .BoundRequestBuilder ;
3345import org .asynchttpclient .HttpResponseBodyPart ;
3951import org .reactivestreams .Publisher ;
4052import org .reactivestreams .Subscriber ;
4153import org .reactivestreams .Subscription ;
54+ import org .slf4j .Logger ;
55+ import org .slf4j .LoggerFactory ;
56+ import org .testng .annotations .AfterClass ;
57+ import org .testng .annotations .BeforeClass ;
4258import org .testng .annotations .Test ;
4359
4460import rx .Observable ;
4561import rx .RxReactiveStreams ;
4662
47- public class ReactiveStreamsTest extends AbstractBasicTest {
63+ public class ReactiveStreamsTest {
64+
65+ private static final Logger LOGGER =LoggerFactory .getLogger (ReactiveStreamsTest .class );
4866
49- public static Publisher <ByteBuffer >createPublisher (final byte []bytes ,final int chunkSize ) {
50- Observable <ByteBuffer >observable =Observable .from (new ByteBufferIterable (bytes ,chunkSize ));
67+ public static Publisher <ByteBuf >createPublisher (final byte []bytes ,final int chunkSize ) {
68+ Observable <ByteBuf >observable =Observable .from (new ByteBufIterable (bytes ,chunkSize ));
5169return RxReactiveStreams .toPublisher (observable );
5270 }
5371
72+ private Tomcat tomcat ;
73+ private int port1 ;
74+
75+ @ SuppressWarnings ("serial" )
76+ @ BeforeClass (alwaysRun =true )
77+ public void setUpGlobal ()throws Exception {
78+
79+ String path =new File ("." ).getAbsolutePath () +"/target" ;
80+
81+ tomcat =new Tomcat ();
82+ tomcat .setHostname ("localhost" );
83+ tomcat .setPort (0 );
84+ tomcat .setBaseDir (path );
85+ Context ctx =tomcat .addContext ("" ,path );
86+
87+ Tomcat .addServlet (ctx ,"webdav" ,new HttpServlet () {
88+
89+ @ Override
90+ public void service (HttpServletRequest httpRequest ,HttpServletResponse httpResponse )throws ServletException ,IOException {
91+ LOGGER .debug ("Echo received request {} on path {}" ,httpRequest ,httpRequest .getServletContext ().getContextPath ());
92+
93+ if (httpRequest .getHeader ("X-HEAD" ) !=null ) {
94+ httpResponse .setContentLength (1 );
95+ }
96+
97+ if (httpRequest .getHeader ("X-ISO" ) !=null ) {
98+ httpResponse .setContentType (TestUtils .TEXT_HTML_CONTENT_TYPE_WITH_ISO_8859_1_CHARSET );
99+ }else {
100+ httpResponse .setContentType (TestUtils .TEXT_HTML_CONTENT_TYPE_WITH_UTF_8_CHARSET );
101+ }
102+
103+ if (httpRequest .getMethod ().equalsIgnoreCase ("OPTIONS" )) {
104+ httpResponse .addHeader ("Allow" ,"GET,HEAD,POST,OPTIONS,TRACE" );
105+ }
106+
107+ Enumeration <String >e =httpRequest .getHeaderNames ();
108+ String headerName ;
109+ while (e .hasMoreElements ()) {
110+ headerName =e .nextElement ();
111+ if (headerName .startsWith ("LockThread" )) {
112+ final int sleepTime =httpRequest .getIntHeader (headerName );
113+ try {
114+ Thread .sleep (sleepTime == -1 ?40 :sleepTime *1000 );
115+ }catch (InterruptedException ex ) {
116+ }
117+ }
118+
119+ if (headerName .startsWith ("X-redirect" )) {
120+ httpResponse .sendRedirect (httpRequest .getHeader ("X-redirect" ));
121+ return ;
122+ }
123+ httpResponse .addHeader ("X-" +headerName ,httpRequest .getHeader (headerName ));
124+ }
125+
126+ String pathInfo =httpRequest .getPathInfo ();
127+ if (pathInfo !=null )
128+ httpResponse .addHeader ("X-pathInfo" ,pathInfo );
129+
130+ String queryString =httpRequest .getQueryString ();
131+ if (queryString !=null )
132+ httpResponse .addHeader ("X-queryString" ,queryString );
133+
134+ httpResponse .addHeader ("X-KEEP-ALIVE" ,httpRequest .getRemoteAddr () +":" +httpRequest .getRemotePort ());
135+
136+ Cookie []cs =httpRequest .getCookies ();
137+ if (cs !=null ) {
138+ for (Cookie c :cs ) {
139+ httpResponse .addCookie (c );
140+ }
141+ }
142+
143+ Enumeration <String >i =httpRequest .getParameterNames ();
144+ if (i .hasMoreElements ()) {
145+ StringBuilder requestBody =new StringBuilder ();
146+ while (i .hasMoreElements ()) {
147+ headerName =i .nextElement ();
148+ httpResponse .addHeader ("X-" +headerName ,httpRequest .getParameter (headerName ));
149+ requestBody .append (headerName );
150+ requestBody .append ("_" );
151+ }
152+
153+ if (requestBody .length () >0 ) {
154+ String body =requestBody .toString ();
155+ httpResponse .getOutputStream ().write (body .getBytes ());
156+ }
157+ }
158+
159+ String requestBodyLength =httpRequest .getHeader ("X-" +CONTENT_LENGTH );
160+
161+ if (requestBodyLength !=null ) {
162+ byte []requestBodyBytes =IOUtils .toByteArray (httpRequest .getInputStream ());
163+ int total =requestBodyBytes .length ;
164+
165+ httpResponse .addIntHeader ("X-" +CONTENT_LENGTH ,total );
166+ String md5 =TestUtils .md5 (requestBodyBytes ,0 ,total );
167+ httpResponse .addHeader (CONTENT_MD5 .toString (),md5 );
168+
169+ httpResponse .getOutputStream ().write (requestBodyBytes ,0 ,total );
170+ }else {
171+ int size =16384 ;
172+ if (httpRequest .getContentLength () >0 ) {
173+ size =httpRequest .getContentLength ();
174+ }
175+ if (size >0 ) {
176+ int read =0 ;
177+ while (read > -1 ) {
178+ byte []bytes =new byte [size ];
179+ read =httpRequest .getInputStream ().read (bytes );
180+ if (read >0 ) {
181+ httpResponse .getOutputStream ().write (bytes ,0 ,read );
182+ }
183+ }
184+ }
185+ }
186+
187+ httpResponse .getOutputStream ().flush ();
188+ // FIXME don't always close, depends on the test, cf ReactiveStreamsTest
189+ // httpResponse.getOutputStream().close();
190+ }
191+ });
192+ ctx .addServletMappingDecoded ("/*" ,"webdav" );
193+ tomcat .start ();
194+ port1 =tomcat .getConnector ().getLocalPort ();
195+ }
196+
197+ @ AfterClass (alwaysRun =true )
198+ public void tearDownGlobal ()throws InterruptedException ,Exception {
199+ tomcat .stop ();
200+ }
201+
202+ private String getTargetUrl () {
203+ return String .format ("http://localhost:%d/foo/test" ,port1 );
204+ }
205+
54206@ Test (groups ="standalone" )
55207public void testStreamingPutImage ()throws Exception {
56208try (AsyncHttpClient client =asyncHttpClient (config ().setRequestTimeout (100 *6000 ))) {
@@ -114,8 +266,8 @@ public static void main(String[] args) throws Exception {
114266@ Test (groups ="standalone" ,expectedExceptions =ExecutionException .class )
115267public void testFailingStream ()throws Exception {
116268try (AsyncHttpClient client =asyncHttpClient (config ().setRequestTimeout (100 *6000 ))) {
117- Observable <ByteBuffer >failingObservable =Observable .error (new FailedStream ());
118- Publisher <ByteBuffer >failingPublisher =RxReactiveStreams .toPublisher (failingObservable );
269+ Observable <ByteBuf >failingObservable =Observable .error (new FailedStream ());
270+ Publisher <ByteBuf >failingPublisher =RxReactiveStreams .toPublisher (failingObservable );
119271
120272client .preparePut (getTargetUrl ()).setBody (failingPublisher ).execute ().get ();
121273 }
@@ -337,18 +489,18 @@ public void onComplete() {
337489 }
338490 }
339491
340- static class ByteBufferIterable implements Iterable <ByteBuffer > {
492+ static class ByteBufIterable implements Iterable <ByteBuf > {
341493private final byte []payload ;
342494private final int chunkSize ;
343495
344- public ByteBufferIterable (byte []payload ,int chunkSize ) {
496+ public ByteBufIterable (byte []payload ,int chunkSize ) {
345497this .payload =payload ;
346498this .chunkSize =chunkSize ;
347499 }
348500
349501@ Override
350- public Iterator <ByteBuffer >iterator () {
351- return new Iterator <ByteBuffer >() {
502+ public Iterator <ByteBuf >iterator () {
503+ return new Iterator <ByteBuf >() {
352504private volatile int currentIndex =0 ;
353505
354506@ Override
@@ -357,11 +509,11 @@ public boolean hasNext() {
357509 }
358510
359511@ Override
360- public ByteBuffer next () {
512+ public ByteBuf next () {
361513int thisCurrentIndex =currentIndex ;
362514int length =Math .min (chunkSize ,payload .length -thisCurrentIndex );
363515currentIndex +=length ;
364- return ByteBuffer . wrap (payload ,thisCurrentIndex ,length );
516+ return Unpooled . wrappedBuffer (payload ,thisCurrentIndex ,length );
365517 }
366518
367519@ Override