- Notifications
You must be signed in to change notification settings - Fork0
Library for composability of interdependent non-blocking I/O tasks
License
krupalshah/Composer
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Composer helps you to organize and execute multiple interdependent asynchronous input/outputtasks such as webservice calls, database read/writesand file i/o together with concurrency support usingjava.util.concurrent
APIs. It is compatible with Java 8 & above on all JVM based platforms including Android.
Here is an example of how you can use Composer to create a chain of tasks. Consider a scenario where you want to get an associated Twitter account details for your app user, fetch different kinds of twitter data for that user, show them on app UI and then track the event in your analytics database. All of these tasks are asynchronous (except refreshing the UI) and dependent on each other.
Composer.startWith(currentUser.getUserId(),err ->logger.error("Error executing tasks",err)) .thenExecute(userId -> {returnaccountService.getTwitterAccountDetails(userId); }) .thenContinueIf(response ->response.status.isOk()) .thenExecuteTogether(Results::new,response ->twitterService.getTweets(response.username),response ->twitterService.getMedia(response.username),response ->twitterService.getFollowers(response.username), ) .thenWaitFor(results -> {refreshUI(results); }) .thenExecute(() -> {analyticsDb.trackEvent("get_twitter_details"); }) .thenFinish();
Please note that Composer does not aim to provide an extensible API for managing asynchronous tasks. Instead, it aims to provide a minimal, easy to use API which can be useful for the scenarios where interdependency between such tasks forces you to write boilerplate code for managing state, validating conditions or handling errors. Most client-side mobile/web applications and backend services communicating with each other require an extensible framework in which interdependent asynchronous tasks can be glued together. Composer serves only specific use cases and it may not be a good fit all the use cases, especially when having an extensibe asynchronous framework is critical to the application.
For detailed usage information, please referGetting Started section.
- Gradle:
allprojects { repositories {... maven { url'https://jitpack.io' } }}dependencies { implementation'com.github.krupalshah:Composer:2.0.1'}
- Maven:
<repositories> <repository> <id>jitpack.io</id> <url>https://jitpack.io</url> </repository></repositories><dependency> <groupId>com.github.krupalshah</groupId> <artifactId>Composer</artifactId> <version>2.0.1</version></dependency>
The API consists of an interfaceComposable
and its implementationComposer
. The implementation serves as an entrypoint and returnsComposable
at each step of execution until chaining is discontinued.
UsestartWith()
to create your firstComposable
like below:
Composer.startWith(someInputOrTask,err ->logger.error("Error executing tasks",err))
The first param is only required if you want to pass some pre-known value as an input, or a task that may produce the same.
The second paramErrorStream
receives all errors during execution.
If you don't have any pre-known input or task, you can simply create your firstComposable
by just providing anErrorStream
like below:
Composer.startWith(err ->logger.error("Error executing tasks",err))
UsethenFinish()
to discontinue further chaining and complete the awaiting task execution. BetweenstartWith
andthenFinish
, chain your tasks according to their dependencies.
ATask
can be of type
SimpleTask
if it takes no input and returns no output.ConsumingTask<Input>
if it takes an input but returns no output.ProducingTask<Output>
if it takes no input but returns an output.TransformingTask<Input,Output>
if it takes an input and converts it into output.
Consider a very straightforward scenario in which some independent data is to be fetched from remote data source via webservice, converted into csv format, written to a file, and an email is to triggered when all of this is done.
Given this information, a chain can be as written as below:
Composer.startWith(() ->service.fetchData(),err ->logger.error("Error executing tasks",err)) .thenExecute(response -> {returnconverter.convertToCsv(response.data); }) .thenExecute(csv -> {writer.writeCsvFile(csv); }) .thenExecute(() -> {mailer.sendEmail("All Tasks Completed"); }) .thenFinish();
Each step returnsComposable
, which can be detached and glued wherever required:
Composable<String>myComposable =Composer.startWith(() ->service.fetchData(),err ->logger.error("Error executing tasks",err)) .thenExecute(response -> {returnconverter.convertToCsv(response.data); })doSomething();myComposable.thenExecute(csv -> {writer.writeCsvFile(csv); }) .thenExecute(() -> {mailer.sendEmail("All Tasks Completed"); }) .thenFinish();
Please note that chained tasks are executed asynchronously by default. Hence, in the above example there is no guarantee thatdoSomething()
will be run after the data is converted to csv. If something needs to be executed synchronously in-between, chain it as specified underExecuting task synchronously section.
Different method variants have been provided to execute multiple tasks concurrently. All you have to do is to specify a collection of tasks to be executed in parallel. The order of execution is never guaranteed.
Consider a slight modification in the previous scenario where converted csv is persisted in the database along with a file.
In that case, both tasks can be executed concurrently usingthenExecuteTogether()
variants like below:
Composer.startWith(() ->service.fetchData(),err ->logger.error("Error executing tasks",err)) .thenExecute(response -> {returnconverter.convertToCsv(response.data); }) .thenExecuteTogether(csv ->writer.writeCsvFile(csv),db.storeCsv(csv)//both tasks will be executed concurrently ) .thenExecute(() -> {mailer.sendEmail("All Tasks Completed"); }) .thenFinish();
In the cases where a task produces an output, concurrent variants can execute any number of tasks with the same type of output, or maximum three tasks with different types of output.
Such tasks will require aCollector
as an additional parameter. ACollector
collects results from multiple producer tasks and returns something which can hold those results.
Consider a modification in the first scenario where data is to be converted into multiple formats such as csv, xml and yaml. In that case, we can use concurrent method variants and collect results like below:
Composer.startWith(() ->service.fetchData(),err ->logger.error("Error executing tasks",err)) .thenExecuteTogether( (response,csv,xml,yaml) ->newConvertedData(csv,xml,yaml),//ConvertedData is a pojo returned from collector to hold outputs from concurrently executing tasksresponse ->converter.convertToCsv(response.data),response ->converter.convertToXml(response.data),response ->converter.convertToYaml(response.data) ) .thenExecuteTogether(convertedData ->writer.writeCsvFile(convertedData.csv),convertedData ->writer.writeXmlFile(convertedData.xml),convertedData ->writer.writeYamlFile(convertedData.yaml) ) .thenExecute(() ->mailer.sendEmail("All Tasks Completed")) .thenFinish();
In the cases where an upstream output contains a collection, and you want to execute a task concurrently for each value in that collection, usethenExecuteForEach()
variants.
Consider a scenario where you need to fetch some posts from a service and then fetch comments for each post in the response. In that case, you will need to expand the upstream response to a collection of posts, provide the task to be executed concurrently for each post and finally collect the comments grouped by posts like below:
Composer.startWith(() ->service.fetchPosts(),err ->logger.error("Error executing tasks",err)) .thenExecuteForEach(response ->response.getPosts(),//provide a collection to iterate overpost ->service.fetchComments(post),//this task will be applied for each post in the list (response,postAndComments) ->newGroupedData(postAndComments)//collector will receive results as pairs of <Post,List<Comment>> assuming that the service is retuning the list of comments for a specific post ) .thenExecute(data -> {db.insertPostsAndComments(data); }) .thenExecute(() -> {mailer.sendEmail("All Tasks Completed"); }) .thenFinish();
A task output must benon-null
. Any task in a chain that receivesnull
as an input will discontinue further execution.
UsethenContinueIf()
to validate the task output before it is used as an input of dependent tasks. If the condition specified returns false, you will receive aComposerException
on theErrorStream
provided. Further execution will be discontinued and downstream consuming tasks will receivenull
as a result.
For example, in the first scenario, consider that you want to check the status and size of the data in response before converting to csv:
Composer.startWith(() ->service.fetchData(),err ->logger.error("Error executing tasks",err)) .thenContinueIf(response ->response.status.isOk() && !response.data.isEmpty())//this will discontinue further execution if the specified condition returns false. .thenExecute(response -> {returnconverter.convertToCsv(response.data); }) .thenExecute(csv -> {writer.writeCsvFile(csv); }) .thenExecute(() -> {mailer.sendEmail("All Tasks Completed"); }) .thenFinish();
By default, all tasks will be executed asynchronously. If you want to execute something synchronously on the same thread the method has been called (in most cases - the application main thread),thenWaitFor
variants can be used like below:
Composer.startWith(() ->produceSomething(),err ->logger.error("Error executing tasks",err)) .thenWaitFor(data -> {showOnUI(data); }) .thenFinish();
Finally, Composer uses anExecutorService
that creates a cached thread pool internally. If you want to provide your custom executor service, pass it as a third param ofstartWith()
like below (not recommended unless required):
Composer.startWith(() ->produceSomething(),err ->logger.error("Error executing tasks",err),customExecutorService)
- Minor changes to avoid compiler warnings.
- This release contains breaking changes. Major API refactorings include renaming all methods to reduce verbosity.
- Collection parameters in
then..Together
variants have been replaced with varargs.
- Fixed a bug where an
ErrorStream
was not transmitting errors synchronously.
- Fixed a bug where an
Copyright 2020 Krupal ShahLicensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.
About
Library for composability of interdependent non-blocking I/O tasks