Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Enrique Zamudio
Enrique Zamudio

Posted on

     

Streaming large queries in Java

I've been using theJdbcTemplate class since version 1.0, and it's evolved nicely, but I was hoping that for version 5 it would include some streaming capabilities for queries with large results. Alas, that didn't happen.

Still, sometimes I need to perform queries that return millions of rows, and I can't use the JdbcTemplate methods that return lists for it. ARowCallbackHandler is perfect for it, but it would so much nicer to just receive a Stream, wouldn't it? Especially if you have custom RowMappers...

So, I decided to write my own Stream generator to use with a JdbcTemplate. In the process, I ended up creating a more generic Stream generator, which I think is good, and so I want to share it with anyone who needs something similar. I don't think it's enough material for a library, though, so I decided to write a post about it instead.

The challenge

First of all, we need to consider that streams are lazy, and when you get a stream and define the operations to be done on it, nothing is happening yet, until you realize a final operation, which needs to actually traverse the elements and apply the operations on it. There are operations which go through the entire stream (such as count, or collecting the elements into another collections), and there are short-circuit operations (such as determining if any element passes some filter).

So we want to get a stream, and define operations on it, and nothing happens, up until the moment when the stream needs to be traverse,then the query needs to be run (which implies having an open connection to the database). If something bad happens, the query needs to stop (and JdbcTemplate will take care of cleaning up the connection and other resources).

The only way I found I could make this work is by using two threads: a producer thread in which the query is run and the rows are somehow fed to the stream, and a consumer thread which is the reader of the stream.

We will need a buffer in which the producer will store elements and from which the consumer will take elements from. A LinkedBlockingQueue seems perfect for this.

So, without further ado, here it is:

publicstatic<T>Stream<T>streamForQuery(intbufferSize,TendOfStreamMarker,Consumer<Consumer<T>>query){finalLinkedBlockingQueue<T>queue=newLinkedBlockingQueue<>(bufferSize);//This is the consumer that is usually passed to queries;//it will receive each item from the query and put it in the queueConsumer<T>filler=t->{try{//Try to add to the queue, waiting up to 1 second//Honestly if after 1 second the queue is still full, either the stream consumer//needs some serious optimization or, more likely, a short-circuit terminal//operation was performed on the stream.if(!queue.offer(t,1,TimeUnit.SECONDS)){//If the queue is full after 1 second, time out.//Throw an exception to stop the producer queue.log.error("Timeoud waiting to feed elements to stream");thrownewBufferOverflowException();}}catch(InterruptedExceptionex){System.err.println("Interrupted trying to add item to stream");ex.printStackTrace();}};//For the stream that we return, we use a Spliterator.returnStreamSupport.stream(()->newSpliterators.AbstractSpliterator<T>(Long.MAX_VALUE,Spliterator.ORDERED){//We need to know if the producer thread has been startedprivatebooleanstarted=false;//If there's an exception in the producer, keep it hereprivatevolatileThrowableboom;/** This method is called once, before advancing to the first element.             * It will start the producer thread, which runs the query, passing it our             * queue filler.             */privatevoidstartProducer(){//Get the consumer threadThreadinterruptMe=Thread.currentThread();//First time this is called it will run the query in a separate thread//This is the producer threadnewThread(()->{try{//Run the query, with our special consumerquery.accept(filler);}catch(BufferOverflowExceptionignore){//The filler threw this, means the queue is not being consumed fast enough//(or, more likely, not at all)}catch(Throwablethr){//Something bad happened, store the exception and interrupt the readerboom=thr;interruptMe.interrupt();}}).start();started=true;}@OverridepublicbooleantryAdvance(Consumer<?superT>action){if(!started){startProducer();}try{//Take an item from the queue and if it's not the end of stream maker, pass it//to the action consumer.Tt=queue.take();if(t!=endOfStreamMarker){action.accept(t);returntrue;}}catch(InterruptedExceptionex){if(boom==null){System.err.println("Interrupted reading from stream");ex.printStackTrace();}else{//Throw the exception from the producer on the consumer sidethrownewRuntimeException(boom);}}returnfalse;}},Spliterator.IMMUTABLE,false);}

And this is how you use it, with a JdbcTemplate:

finalMyRowmarker=newMyRow();Stream<MyRow>stream=streamForQuery(100,marker,callback->{//Pass a RowCallbackHandler that passes a MyRow to the callbackjdbcTemplate.query("SELECT * FROM really_big_table_with_millions_of_rows",rs->{callback.accept(myRowMapper.mapRow(rs,0));});//Pass the marker to the callback, to signal end of streamcallback.accept(marker);});

At this point, the query hasn't been performed. You can do stuff such as:

stream = stream.filter(row -> row.isPretty());

And still nothing happens. When you do something like this though:

Optional<MyRow> row = stream.skip(100_000).limit(1000).findAny();

Then the query is executed, the first hundred thousand rows will be read (and skipped), and each row will be passed through the filter, until one is pretty or a thousand rows have been read.

With great power...

Please, please, PLEASE don't use this as a substitute for a good WHERE clause and properly indexing your tables. I've used this thing mainly to generate reports, concatenating streams of disjoint types by mapping the elements to a common type for further processing (basically, making up for the lack of union types in Java).

Having said that, it is pretty cool to be able to read rows from a database in a streaming fashion.
I guess this could be integrated into Spring's JdbcTemplate, and/or jOOQ...

Top comments(5)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss
CollapseExpand
 
mcgowanb profile image
Brian McGowan
  • Joined

I found that theLinkedBlockingQueue fills up to whatever the limit is set to, say 100 in this example and then it throws a buffer overflow exception which is gracefully swallowed. Be nice if you could pop off the queue once you've consumed an element from the stream.

CollapseExpand
 
chochos profile image
Enrique Zamudio
Author of j8583 and jAlarms. Java Champion since 2015,; worked on the Ceylon to Javascript compiler 2012-2017, moved to fintech since.
  • Location
    Mexico City
  • Work
    Principal Software Engineer
  • Joined

Not sure what you mean by poppin off the queue once the element's been consumed...queue.take() is called insidetryAdvance, which removes the first element from the queue and passes it to the stream's consumer...

CollapseExpand
 
jayaerrabelli profile image
jayaerrabelli
  • Joined

I need full working code, it would be of great help to me! Please...

CollapseExpand
 
jayaerrabelli profile image
jayaerrabelli
  • Joined
• Edited on• Edited

This is exactly what I was looking for. Thank you so much. What does MyRow and MyRow Wrapper have ? Where can I see full code ?

CollapseExpand
 
chochos profile image
Enrique Zamudio
Author of j8583 and jAlarms. Java Champion since 2015,; worked on the Ceylon to Javascript compiler 2012-2017, moved to fintech since.
  • Location
    Mexico City
  • Work
    Principal Software Engineer
  • Joined

MyRow is the class of the objects you want to stream. It can be whatever. There's no MyRow Wrapper, but there's a RowMapper implementation that produces MyRow instances from the ResultSet. That's basic JdbcTemplate stuff that's out of the scope of this article.

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Author of j8583 and jAlarms. Java Champion since 2015,; worked on the Ceylon to Javascript compiler 2012-2017, moved to fintech since.
  • Location
    Mexico City
  • Work
    Principal Software Engineer
  • Joined

Trending onDEV CommunityHot

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp