Are you planning to run “some kind of workflow” and don’t want to worry about infrastructure, monitoring, or resumability? Do you want to orchestrate different kinds of tasks, like Python scripts, command-line tools, or applications written in other programming languages? Yes? Then stay with us and have a look at Nextflow!
Nextflow is an open-source software developed bySeqera Labs. We’ve found it especially useful for data-intensive computational pipelines where you want to string together command-line tools and code artifacts. We’ll demonstrate how to accomplish this with some basic concepts and then look at one of the pipelines we are running at 23andMe. Let’s get started!
By the end of this read, you will be familiar with Nextflow scripting and some of the features we use for our imputation pipelines. These principles can be adapted for your own use cases.
The Nextflow scripting language is an extension of Groovy and is quite extensive, so we will focus on a few key concepts to help get started. Many of the definitions and examples here are borrowed from theNextflow docs. These are a great resource with plenty of examples and can expand on the starter code we’ll be providing.
The Nextflow language is based on theDataflow Programming Model. In this context, that’s a fancy way of saying “you can visualize it as the flow of data through a directed acyclic graph (DAG).” The arrows of the DAG are asynchronous FIFO queues from which data can be consumed. These are called Channels in Nextflow. The nodes of the DAG consume from these Channels, perform some tasks, and produce data outputs. These nodes are called Processes in Nextflow.
At the risk of oversimplifying, you can think of Nextflow Channels as queues that hold data for a Process to read from. Nextflow offers two types of channels called Queue Channels and Value Channels. We will demonstrate the difference between the two below.
To explicitly create a Channel, you would use one of the Nextflow-provided Channel Factory methods. Gohere for a full list of Channel factory methods.
Queue Channels are asynchronous unidirectional FIFO queues. So let’s break that down:
Channel X
connectsProcess A
toProcess B
, thenProcess A
can send data toProcess B
viaChannel X
, butProcess B
cannot send data back toProcess A
via the sameChannel X
Here are a few examples of Channel Factory methods used to create Queue Channels:
of
Theof
factory method allows you to create a queue channel that emits a sequence of values defined in the method parameters. For example:
ch = Channel.of(1, 3, 5, 7)
Check out theProcesses — Generic Inputs section below to see how this channel might get consumed.
fromPath
ThefromPath factory method creates Channels that emit one or more file paths. Here’s a basic example for a single file path:
file_ch = Channel.fromPath(‘data/example.txt’)
fromPath
also interprets glob patterns, so you can easily emit multiple files from the same channel. For example:
file_ch = Channel.fromPath(‘data/*.txt’)
Nextflow’s Value Channels are similar to Queue Channels in that they are also asynchronous, unidirectional, and are created using Channel Factory methods. However, Value Channels are bound to a single value and can be read unlimited times without consuming its content.
value
To create avalue
channel, Nextflow offers a straightforward channel factory method called value, which can be used like this:
pi = Channel.value(‘3.1415’)
The docs say it best: “In Nextflow aprocess is the basic processingprimitive to execute a user script.” In other words, a Nextflowprocess is basically an isolated user-defined script, wrapped with additional parameters (e.g., directives, inputs, outputs–explained in the following) that define how the script should be executed in the Nextflow pipeline.
The full syntax of a Nextflow process is outlinedhere. An example process might look something like the following:
# allows you to define processes to be used for modular libraries
nextflow.enable.dsl = 2workflow {
ids = Channel.fromPath('data/ids.txt')
chunksize = Channel.value(1000)
split_ids(ids, chunksize)
}process split_ids { input:
path(ids)
val(chunksize)
output:
file('batch-*')
shell:
"""
split -l !{chunksize} !{ids} batch-
"""
}
Here, we have a fileids.txt
containing a large list of IDs, which are to be split into chunks of size 1000. The process will take theids.txt
file as input, and output smaller files (batch-aa
,batch-ab
, etc.), each containing a separate chunk of 1000 IDs from the originalids.txt
file.
If you remember from the “Dataflow Programming Model” section, Nextflow Processes consume data through Channels. The Input block of a Process defines which Channels the Process should receive data from.
The Inputs to a Nextflow Process directly influence how many tasks the Process spawns. The process consumes the next element from each Input Channel, spawning a new task execution. It repeats the same logic until one or more input channels have no more content to consume. Nextflow creates an execution directory for each task. The relevant files, logs, standard outputs, etc. for that task are found in each execution directory. This directory is identified by a unique hash that Nextflow generates for each task (seeTracing & Visualization section of this post).
There are a variety of ways that a Process can define its Inputs to consume Channels. We’ll focus on ways to consume the example Channels we created above.
You can then consume this channel in a process like so:
ch = Channel.of(1, 3, 5, 7)
process basicExample {
input:
val x from ch shell:
"echo $x"
}
This process will generate four tasks, each consuming a different element fromch
and “echoing” that element to standard output.
Consuming from a value channel is no different, except that there is no limit to the number of times a process can consume from it:
value_ch = Channel.value(3.14)
queue_ch = Channel.of( 1, 3, 5, 7 )process basicExample {
input:
val x from value_ch
val y from queue_ch shell:
"echo $x $y"
}
This process will generate four tasks, each consuming one element fromqueue_ch
and the same element repeatedly fromvalue_ch
.
There are several ways to pass file inputs into a process. Nextflow will create a symlink to the original file in the relevant task execution directory when a process is handling an input file through the methods described below. The file can then be accessed by the script using the name specified in the input declaration.
Here is an example of how one might process multiple files in a directory in parallel:
proteins = Channel.fromPath('data/*.txt')
process catThemAll {
input:
file query_file from proteins shell:
"cat ${query_file}"
}
Each file ending with.txt
in thedata/
directory will be processed by a separate task generated by thecatThemAll
process. Each task can be identified by a unique task name (workflow:catThemAll (2)
) or task_id hash (eg.7a/7b3084
). Nextflow will stage a symlink to the specific file that the task will process in the tasks’ execution directory, eg./work/tasks/7a/7b3084/.file-2.txt
.
Another way to process files is using thepath
qualifier. Bothfile
andpath
qualifiers are similar, except that the former expects file objects, whereas the latter can also interpret strings as the path of the input file. Note that when using raw strings, thepath
qualifier does not interpret special characters (eg. wildcards), so this syntax works best if you know the absolute string path of your file. Here is thepath
qualifier in action:
process catThemAll {
input:
path x1 from file('data/example-*.txt')
path x2 from 'file:///absolute/path/to/working-dir/data/ids.txt' shell:
"""
cat ${x1}
cat ${x2}
"""
}
Assume you had two files under yourdata/
directory:example-1.txt
andexample-2.txt
. Will the above process spawn one task or two? In this case, the path-string qualifier forx2
behaves more like a Value Channel, allowing the process to consume it infinitely many times. As a result the process will spawn tasks untilx1
runs out of files, resulting in two tasks.
Similar to Process Inputs, the Output block of a Process defines to which Channels the Process should send out the results. For example:
customer_ids = Channel.from(1, 2, 3, 4)
process get_data_for_ids { input:
val id from customer_ids output:
file data_for_id_*' into data_for_ids shell:
'''
echo !{id} > data_for_id_!{id}.txt
'''
}data_for_ids.view()
>> /path/to/dir/work/36/1ecd790e4eeb3a786c2e5e288b/data_for_id_3.txt
>> /path/to/dir/work/aa/19b1ac052387cd05ab04021f5f/data_for_id_2.txt
>> /path/to/dir/work/6e/a5ce292656c2c8802108293c97/data_for_id_4.txt
>> /path/to/dir/work/89/0efb6979d54a412274f0ad685d/data_for_id_1.txt
In the above example, theget_data_for_ids
process sends the files generated by the shell command into thedata_for_ids
channel, which downstream processes can then consume. As with Inputs, anything from values to files to stdout can be output to the channel. For more information on how to leverage the Output block, see theNextflow documentation.
Finally, processes allow you to define Directives, which are optional settings that will affect the execution of the current process. These can range from specifying what hardware the process should use, which Docker container to run the process in (if any), retry strategy, how much memory / CPU to allocate for the process, what scripts to run before/after the process, etc.
Following is an example of one way you can use Directives to scale up resource allocation for a task based on the number of retries:
proteins = Channel.fromPath('data/*.txt')
process catThemAll {errorStrategy 'retry'
maxRetries 4
cpus { 3 + task.attempt }
memory { 5.GB * task.cpus } input:
file query_file from proteins shell:
"cat ${query_file}"
}
There are numerous directives available, outlinedhere in the documentation.
So far we’ve talked about Channels and Processes, and how Processes can consume from a Channel (via Input declarations) or output to a Channel (via Output declarations). But what if you wanted to transform the contents of a Channel before it gets consumed by a Process? One way to achieve this is by creating another Process that handles the transformation before outputting it into a new channel; however, this can lead to repeated Processes just built for these common transformations. Thankfully, Nextflow offers an easier way to directly create a channel from another channel with the desired transformations occurring in between. The way to achieve this is by using Operators.
Nextflow operators are methods that allow you to connect channels or to transform values emitted by a channel applying some user-provided rules. Some common reasons we’ve used Operators at 23andMe are for filtering, transforming, splitting, and combining. Building on our example above, here’s how we might leverage the map operator to transform the contents of our channel:
squares = Channel.from(1, 2, 3, 4, 5).map { it * it }
squares.view()
>> 1
>> 4
>> 9
>> 16
>> 25
Another example of how we might flatten a Channel with lists:
flat_list = Channel.from([1, [2, 3]], 4, [5, [6]]).flatten()
flat_list.view()
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
There are numerous other Operators available for all sorts of Channel transformations, and it would be worthwhile to explore thedocs to get a sense of all the options Nextflow offers.
At 23andMe, we use Nextflow to run one of our largest data creation pipelines for product research: Genotype Imputation. Genotype imputation statistically infers the markers of the genome we haven’t observed via our microarrays. (A marker is a DNA sequence with a known physical location on a chromosome.)
Why is this important? Our raw genome data is microarray-based, meaning it covers only a portion of the whole genome. Additionally, our products evolve over time, as do our microarrays. As a result we have customer data originating from different microarrays that overlap to some degree. For research into new features for our customers we rungenome-wide association studies (GWAS). GWASs are most powerful when run on the whole genome and when able to access the same set of markers across all individuals contained. Genotype imputation helps us create a data set with these properties.
Our genotype imputation workflow consists of several sequential steps on a high level:
The following graph is a simplified version of our Nextflow workflow showing the major steps, how many tasks we run per step, and some of the transformations happening in between. The number of tasks run is especially interesting.
As an example, we start the process with a list of 20,000 IDs. Then controlled by one of our internal parameters, that large list gets split up into subsets of 100 IDs each, and for each of those smaller lists we run the “Reshape input data” step. Therefore, the “Reshape input data” step is run 200 times, once for each split input list. As an output of this step, we get 200 VCF files with the data of 100 individuals, each containing all genotype calls for those individuals.
We’ve profiled the individual steps and realized that different steps are cost-effective at different data dimensions. For example, the phasing sweet spot lies in running on a large number of individuals, but a smaller subset of markers. In our case, each phasing step takes in 20,000 individuals, but splits up the chromosomes into smaller region slices to process — 293 regions in total — and that’s our number of tasks for that step.
In contrast, our imputation step takes in only 1000 individuals at a time, but runs on whole chromosomes. Since we impute against 3 different reference panels that cover different characteristics and run a Beagle task for each chromosome (1, … , 22, X.nonpar, X.par1, X.par2
¹) we see25[chromosomes]*20[batches]*3[panels] = 1500 tasks
. Nextflow manages monitoring all of the tasks for us. It moves the flow forward as soon as all inputs for a specific task are available.
All the statistics and runs shown in this post are based on1000 Genomes (1KG) Project data. For runs that required larger inputs than numbers of individuals provided by the 1KG project, we’ve multiplied the individual data for scale. As a result there are slight statistical differences compared to 23andMe customer data production runs, but none that are relevant for the purpose of this post.
The Nextflow architecture allows for a complete separation of the definition of the pipeline and where it is run. The same pipeline script can be run locally or in the cloud by simply changing the `Executor` and some additional parameters in the configuration handed to the Nextflow process at kick-off time. Aside from local execution, Nextflow supports AWS Batch, Kubernetesand quite a few more.
A prerequisite is that the infrastructure supporting the chosen Executor is set up. For using AWS Batch as the executor, the prerequisite is that aJob Queue and Compute Environment are set up and a container is defined in which all dependencies and scripts are installed to run the workflow.
Here is what changes in our configuration file between a local execution running in docker (top) and one run on AWS Batch (bottom):
# local execution running in docker:
docker.enabled = true
process.container = "<your_image>"# execution on AWS Batch:
docker.enabled = true
process {
executor = "awsbatch"
queue = "<job_queue_arn>"
container = "<job_definition_name>"
}
aws.region = "<your_region>"
You can see that the most important difference is theexecutor
specification. If not provided, Nextflow assumes a local execution. The container is encapsulated in the AWS Job definition. The AWS Job Queue specifies the compute environment the tasks will be executed on. No changes to the actual workflow are required at all! If you specify a container instead of a job definition name, Nextflow will automatically create the job definition that encapsulates your container for you. We use our own job definition to be able to control a few more parameters.
A benefit that Nextflow gives us for free is staging or even streaming of inputs from S3 to the executors of the respective process steps. All we need to do is define the input S3 location as a Channel and the Nextflow framework makes sure it is going to be available for the task.
We use S3 to preserve and restore our sessions. We trap the EXIT signal of the script from which we run Nextflow and sync the local.nextflow
directory to S3 as our last action. This directory is the place where logs, caches, and status are stored for the run. Before calling Nextflow in a subsequent run, that same script restores the contents it finds at the S3.nextflow
backup location to the local environment. This gives us the ability to have Nextflow automatically resume a pipeline from the last successful task if it has failed or been interrupted in a previous run. Note that the cache of a run can become quite large as it stores all intermediary artifacts. Frequent cleanups are a necessity, for example, using lifecycle policies expiring objects after a certain time if running and storing data on AWS S3. Another important aspect to think about is the directory structure of the cache: A job should be able to find its own previous cache to resume from without having to read through a lot of data from other jobs.
Nextflow offers valuable tracing and visualizations to help monitor workflows and identify and debug performance bottlenecks. The following examples are taken from a small (manually interrupted) test run of our imputation pipeline on 1KG data with only a few chromosomes.
During execution we’ve included the options — with-trace, — with-timeline and — with-report to log additional processing data that can help to tune and debug a pipeline.
You can specify the format of your trace as part of your Nextflow configuration file to include desired metrics. The following is the configuration for our example trace output and it only includes a subset of thepossible fields that can be tracked.
trace {
Fields = '''
task_id,
hash,
name,
status,
duration,
%cpu,
peak_vmem,workdir
'''
}
The output contains a line for each task that has been kicked off with the desired information:
The timeline gives you a visual understanding of how your tasks are run and the resources (time and data transfers) they require. See below for a 30,000 foot view from above of one of our smaller imputation runs on just three chromosomes. You’ll see a small gap in the upper part where we’ve skipped over a few initial steps to fit into a decent sized picture.
The report gives you an even more detailed view of all the tasks run, statistics, and resource utilization and helps with fine-tuning. The following report is from one of our full-scale test runs. You see that a Nextflow head job runs and monitors 5292 tasks in total. You also see that some tasks have failed, but nextflow has resubmitted them so the entire workflow was able to succeed. These failures in our case were all results ofAWS spot compute terminations, so they were fully “retryable.” The report ends with a table that shows the full trace measures for each task (not shown here).
The most interesting section of the report is the ‘Resource Usage’ section. It can help tune the resources given each task job. Resource usage is visualized for CPU, memory, job duration and I/O. The following figure shows the CPU usage visualization. Especially the% Allocated
tab helps understand how close the resource specifications in the workflow are compared to the actual usage. We can see that in our test run, many processes were actually pretty close to 100% allocation, but there were a few that we could tune. For example, we should look into cutting down the allocated CPUs for themerge_raw_data
task and increasing them for thepublish
task.
Getting the resource specifications right is immensely important when running a lot of different tasks on the same compute environment, for example, AWS Batch where large instance types will execute multiple tasks. The closer the task specs are the better the utilization of the assigned hardware.
Currently, Nextflow does not provide any built-in way to unit test your pipeline (there is anopen ticket for this; maybe you can be the one to add it!). To get around this, we’ve created our own Python library that works as a wrapper around Nextflow, specifically to be used in unit tests for verifying that the outputs of each pipeline step matches our expectations.
To see how we use this wrapper library to unit test our pipeline, lookhere. At a high level, what this NextflowUtils Python wrapper does is:
Here are some stats from our production run and the initial scale we ran at:
The following screenshots are just momentary snapshots of what the AWS Batch dashboard looked like at some point during processing.
The following is an overview of the number of vCPUs that are being scaled up and down automatically based on the job pressure over the course of 3 days:
We have capped our compute environment to use a maximum of 10K vCPUs as there are a few more optimizations that we would need to consider if we wanted to scale up higher. One would for example be working around AWS request throttling (e.g. S3 slow downs) that are a result of a high number of processes running in parallel that are using input files from S3.
At the end of the day, the sky’s the limit. With a few tweaks here or there, we could scale even higher. Nextflow does a lot of work so a developer delivering features does not need to “reinvent the wheel” over and over again. It comes with many tools to:
¹ Gohere for a deeper understanding of the split of the X chromosome intopar1
andpar2
regions;nonpar
comprises the region betweenpar1
andpar2
.
Big thanks to theauthors of the Nextflow library anddocumentation, which were heavily referenced while writing this blog post
Anuved Verma is a Software Engineer at 23andMe on the Big Data Platform team. He loves building software to help genetics research at scale.
Anja Bog is an Engineering Manager on the Big Data Platform team. She’s passionate about contributing to a world where we can all live healthier longer.
23andMe is hiring! Check out ourcurrent openings!
The 23andMe engineering team builds the world’s foremost personal genetics service with the goal of helping people access, understand, and benefit from the human genome.