Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A streaming data library

NotificationsYou must be signed in to change notification settings

snoyberg/conduit

Repository files navigation

Conduit is a framework for dealing with streaming data, such asreading raw bytes from a file, parsing a CSV response body from anHTTP request, or performing an action on all files in a directorytree. It standardizes various interfaces for streams of data, andallows a consistent interface for transforming, manipulating, andconsuming that data.

Some of the reasons you'd like to use conduit are:

  • Constant memory usage over large data
  • Deterministic resource usage (e.g., promptly close file handles)
  • Easily combine different data sources (HTTP, files) with dataconsumers (XML/CSV processors)

Want more motivation on why to use conduit? Check outthis presentation on conduit.Feel free to ignore theyesod section.

NOTE As of March 2018, this document has been updated to becompatible with version 1.3 of conduit. This is available in Long TermSupport (LTS) Haskell version 11 and up. For more information onchanges between versions 1.2 and 1.3,see the changelog.

Table of Contents

  1. Synopsis
  2. Libraries
  3. Conduit as a bad list
  4. Interleaved effects
  5. Terminology and concepts
  6. Folds
  7. Transformations
  8. Monadic composition
  9. Primitives
  10. Evaluation strategy
  11. Resource allocation
  12. Chunked data
  13. ZipSink
  14. ZipSource
  15. ZipConduit
  16. Forced consumption
  17. FAQs
  18. More exercises
  19. Legacy syntax
  20. Further reading

Synopsis

Basic examples of conduit usage, much more to follow!

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain=do-- Pure operations: summing numbers.print$ runConduitPure$ yieldMany [1..10].| sumC-- Exception safe file access: copy a file.writeFile"input.txt""This is a test."-- create the source file    runConduitRes$ sourceFileBS"input.txt".| sinkFile"output.txt"-- actual copyingreadFile"output.txt">>=putStrLn-- prove that it worked-- Perform transformations.print$ runConduitPure$ yieldMany [1..10].| mapC (+1).| sinkList

Libraries

There are a large number of packages relevant to conduit, just searchfor conduit onthe LTS Haskell package list page. Inthis tutorial, we're going to rely mainly on theconduit library itself,which provides a large number of common functions built-in. There isalso theconduit-extralibrary, which adds in some common extra support, like GZIP(de)compression.

You can run the examples in this tutorial asStack scripts.

Conduit as a bad list

Let's start off by comparing conduit to normal lists. We'll be able tocompare and contrast with functions you're already used to workingwith.

#!/usr/bin/env stack-- stack script --resolver lts-12.21{-#LANGUAGE ExtendedDefaultRules #-}importConduittake10List::IO()take10List=print$take10 [1..]take10Conduit::IO()take10Conduit=print$ runConduitPure$ yieldMany [1..].| takeC10.| sinkListmain::IO()main=doputStrLn"List version:"    take10ListputStrLn""putStrLn"Conduit version:"    take10Conduit

Our list function is pretty straightforward: create an infinite listfrom 1 and ascending, take the first 10 elements, and then print thelist. The conduit version does the exact same thing, but:

  • In order to convert the[1..] list into a conduit, we use theyieldMany function. (And note that, like lists, conduit has noproblem dealing with infinite streams.)
  • We're not just doing function composition, and therefore we need touse the.| composition operator. This combines multiple componentsof a conduit pipeline together.
  • Instead oftake, we usetakeC. TheConduit module providesmany functions matching common list functions, but appends aC todisambiguate the names. (If you'd prefer to use a qualified import,check outData.Conduit.Combinators).
  • To consume all of our results back into a list, we usesinkList
  • We need to explicitly run our conduit pipeline to get a result fromit. Since we're running a pure pipeline (no monadic effects), we canuserunConduitPure.
  • And finally, the data flows from left to right in the conduitcomposition, as opposed to right to left in normal functioncomposition. There's nothing deep to this; it's just intended tomake conduit feel more like common streaming abstraction from otherplaces. For example, notice how similar the code above looks topiping in a Unix shell:ps | grep ghc | wc -l.

Alright, so what we've established is that we can use conduit as abad, inconvenient version of lists. Don't worry, we'll soon start tosee cases where conduit far outshines lists, but we're not quite thereyet. Let's build up a slightly more complex pipeline:

#!/usr/bin/env stack-- stack script --resolver lts-12.21{-#LANGUAGE ExtendedDefaultRules #-}importConduitcomplicatedList::IO()complicatedList=print$takeWhile (<18)$map (*2)$take10 [1..]complicatedConduit::IO()complicatedConduit=print$ runConduitPure$ yieldMany [1..].| takeC10.| mapC (*2).| takeWhileC (<18).| sinkListmain::IO()main=doputStrLn"List version:"    complicatedListputStrLn""putStrLn"Conduit version:"    complicatedConduit

Nothing more magical going on, we're just looking at morefunctions. For our last bad-list example, let's move over from a purepipeline to one which performs some side effects. Instead ofprinting the whole result list, let's usemapM_C to print eachvalue individually.

#!/usr/bin/env stack-- stack script --resolver lts-12.21{-#LANGUAGE ExtendedDefaultRules #-}importConduitcomplicatedList::IO()complicatedList=mapM_print$takeWhile (<18)$map (*2)$take10 [1..]complicatedConduit::IO()complicatedConduit= runConduit$ yieldMany [1..].| takeC10.| mapC (*2).| takeWhileC (<18).| mapM_Cprintmain::IO()main=doputStrLn"List version:"    complicatedListputStrLn""putStrLn"Conduit version:"    complicatedConduit

For the list version, all we've done is addedmapM_ at thebeginning. In the conduit version, we replaceprint $ runConduitPurewithrunConduit (since we're no longer generating a result to print,and our pipeline now has effects), and replacedsinkList withmapM_C print. We're no longer reconstructing a list at the end,instead just streaming the values one at a time into theprintfunction.

Interleaved effects

Let's make things a bit more difficult for lists. We've played totheir strengths until now, having a pure series of functions composed,and then only performing effects at the end (eitherprint ormapM_ print). Suppose we have some new function:

magic::Int->IOIntmagic x=doputStrLn$"I'm doing magic with"++show xreturn$ x*2

And we want to use this in place of themap (* 2) that we were doingbefore. Let's see how the list and conduit versions adapt:

#!/usr/bin/env stack-- stack script --resolver lts-12.21{-#LANGUAGE ExtendedDefaultRules #-}importConduitmagic::Int->IOIntmagic x=doputStrLn$"I'm doing magic with"++show xreturn$ x*2magicalList::IO()magicalList=mapM magic (take10 [1..])>>=mapM_print.takeWhile (<18)magicalConduit::IO()magicalConduit= runConduit$ yieldMany [1..].| takeC10.| mapMC magic.| takeWhileC (<18).| mapM_Cprintmain::IO()main=doputStrLn"List version:"    magicalListputStrLn""putStrLn"Conduit version:"    magicalConduit

Notice how different the list version looks: we needed to break out>>= to allow us to have two different side-effecting actions (mapM magic andmapM_ print). Meanwhile, in conduit, all we did wasreplacemapC (* 2) withmapMC magic. This is where we begin to seethe strength of conduit: it allows us to build up large pipelines ofcomponents, and each of those components can be side-effecting!

However, we're not done with the difference yet. Try to guess what theoutput will be, and then ideally run it on your machine and see ifyou're correct. For those who won't be running it, here's the output:

List version:I'm doing magic with 1I'm doing magic with 2I'm doing magic with 3I'm doing magic with 4I'm doing magic with 5I'm doing magic with 6I'm doing magic with 7I'm doing magic with 8I'm doing magic with 9I'm doing magic with 10246810121416Conduit version:I'm doing magic with 12I'm doing magic with 24I'm doing magic with 36I'm doing magic with 48I'm doing magic with 510I'm doing magic with 612I'm doing magic with 714I'm doing magic with 816I'm doing magic with 9

In the list version, we apply themagic function to all 10 elementsin the initial list, printing all the output at once and generating anew list. We then usetakeWhile on this new list and exclude thevalues 18 and 20. Finally, we print out each element in our new8-value list. This has a number of downsides:

  • We had to force all 10 items of the list into memory at once. For 10items, not a big deal. But if we were dealing with massive amountsof data, this could cripple our program.
  • We did "more magic" than was strictly necessary: we appliedmagicto 10 items in the list. However, ourtakeWhile knew when itlooked at the 9th result that it was going to ignore the rest of thelist. Nonetheless, because our two components (magic andtakeWhile) are separate from each other, we couldn't know that.

Let's compare that to the conduit version:

  • From the output, we can see that the calls tomagic areinterleaved with the calls toprint. This shows that our dataflows through the whole pipeline one element at a time, and neverneeds to build up an intermediate list. In other words, we getconstant memory usage in this pipeline, a huge selling point forconduit.
  • Notice that we only perform "magic" 9 times: once we runmagic on9, get a result of 18, and find out that it fails ourtakeWhileC (< 18), the conduit pipeline doesn't demand any more values, andthereforemagic isn't run again. We'll describe in more detaillater how conduit is consumer-driven, but this is your first tasteof this.

To be clear, it's entirely possible to get this behavior with alist-based program. What you'll lose is easy composition. For example,here's one way to get the same behavior as was achieved with conduit:

#!/usr/bin/env stack-- stack script --resolver lts-12.21magic::Int->IOIntmagic x=doputStrLn$"I'm doing magic with"++show xreturn$ x*2main::IO()main=dolet go[]=return()        go (x:xs)=do            y<- magic xif y<18thendoprint y                    go xselsereturn()    go$take10 [1..]

Notice how we've had to reimplement the behavior oftakeWhile,mapM, andmapM_ ourselves, and the solution is less compositional.Conduit makes it easy to get the right behavior: interleavedeffects, constant memory, and (as we'll see later) deterministicresource usage.

Terminology and concepts

Let's take a step back from the code and discuss some terminology andconcepts in conduit. Conduit deals withstreams of data. Eachcomponent of apipeline canconsume data fromupstream, andproduce data to senddownstream. For example:

runConduit$ yieldMany [1..10].| mapCshow.| mapM_Cprint

In this snippet,yieldMany [1..10],mapC show, andmapM_C printare each components. We use the.| operator—a synonym for thefuse function—tocompose these components into a pipeline. Then we run that pipelinewithrunConduit.

From the perspective ofmapC show,yieldMany [1..10] is itsupstream, andmapM_C is its downstream. When we look atyieldMany [1..10] .| mapC show, what we're actually doing is combining thesetwo components into a larger component. Let's look at the streamsinvolved:

  • yieldMany consumes nothing from upstream, and produces a stream ofInts
  • mapC show consumes a stream ofInts, and produces a stream ofStrings
  • When we combine these two components together, we get somethingwhich consumes nothing from upstream, and produces a stream ofStrings.

To add some type signatures into this:

yieldMany [1..10]::ConduitT()IntIO()mapCshow::ConduitTIntStringIO()

There are four type parameters toConduitT:

  • The first indicates the upstream value, or input. ForyieldMany,we're using(), though really it could be any type since we neverread anything from upstream. FormapC, it'sInt
  • The second indicates the downstream value, or output. ForyieldMany, this isInt. Notice how this matches the input ofmapC, which is what lets us combine these two. The output ofmapC isString.
  • The third indicates the base monad, which tells us what kinds ofeffects we can perform. AConduitT is a monad transformer, so youcan uselift to perform effects. (We'll learn more about conduit'smonadic nature later.) We're usingIO in our example.
  • The final indicates the result type of the component. This istypically only used for the most downstream component in apipeline. We'll get into this when we discuss folds below.

Let's also look at the type of our.| operator:

(.|)::Monadm=>ConduitTabm()->ConduitTbcmr->ConduitTacmr

This shows us that:

  • The output from the first component must match the input from thesecond
  • We ignore the result type from the first component, and keep theresult of the second
  • The combined component consumes the same type as the first componentand produces the same type as the second component
  • Everything has to run in the same base monad

Exercise Work through what happens when we add.| mapM_C printto the mix above.

Finally, let's look at the type of therunConduit function:

runConduit::Monadm=>ConduitT()Voidmr->mr

This gives us a better idea of what a pipeline is: just a selfcontained component, which consumes nothing from upstream (denoted by()) and producing nothing to downstream (denoted byVoid)*. Whenwe have such a stand-alone component, we can run it to extract amonadic action that will return a result (them r).

* The choice of() andVoid instead of, say, both() or bothVoid, is complicated. For now, I recommend just accepting that thismakes sense. The short explanation is that the input is in negativeposition whereas the output is in positive position, and therefore wecan give the strongerVoid guarantee in the output case. The longexplanation can be foundhere.

Finally, we talked about pure pipelines before. Those are justpipelines withIdentity as the base monad:

runConduitPure::ConduitT()VoidIdentityr->r

Folds

A common activity with lists is folding down to a single result. Thisconcept translates directly into conduit, and works nicely at ensuringconstant memory usage. If you're familiar with folding over lists, theconcepts here should be pretty straightforward, so this will mostlyjust be a collection of examples.

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main=print$ runConduitPure$ yieldMany [1..100::Int].|sumC

Summing is straightforward, and can be done if desired with thefoldlC function:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main=print$ runConduitPure$ yieldMany [1..100::Int].|foldlC (+)0

You can usefoldMapC to fold monoids together:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportData.Monoid (Sum (..))main::IO()main=print$ getSum$ runConduitPure$ yieldMany [1..100::Int].|foldMapCSum

Or you can usefoldC as a shortened form offoldMapC id:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main=putStrLn$ runConduitPure$ yieldMany [1..10::Int].| mapC (\i->show i++"\n").| foldC

Though if you want to make that easier you can useunlinesC:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main=putStrLn$ runConduitPure$ yieldMany [1..10::Int].| mapCshow.| unlinesC.| foldC

You can also do monadic folds:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportData.Monoid (Product (..))magic::Int->IO (ProductInt)magic i=doputStrLn$"Doing magic on"++show ireturn$Product imain::IO()main=doProduct res<- runConduit$ yieldMany [1..10].| foldMapMC magicprint res

Or withfoldMC:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmagic::Int->Int->IOIntmagic total i=doputStrLn$"Doing magic on"++show ireturn$! total* imain::IO()main=do    res<- runConduit$ yieldMany [1..10].| foldMC magic1print res

There are plenty of other functions available in theconduit-combinator library. We won't be covering all of them in thistutorial, but hopefully this crash-course will give you an idea ofwhat kinds of things you can do and help you understand the API docs.

Transformations

When learning lists, one of the first functions you'll see ismap,which transforms each element of the list. We've already seenmapC,above, which does the same thing for conduit. This is just one of manyfunctions available for performing transformations. Like folds, thesefunctions are named and behave like their list counterparts in manyexamples, so we'll just blast through some examples.

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany [1..10].| mapC (*2).| mapM_Cprint

We can also filter out values:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany [1..10].| filterCeven.| mapM_Cprint

Or if desired we can add some values between each value in the list:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany [1..10].| intersperseC0.| mapM_Cprint

It's also possible to "flatten out" a conduit, by converting a streamof chunks (like a list of vector) of data into the individual values.

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany (map (replicate5) [1..10]).| concatC.| mapM_Cprint

NOTE This is our first exposure to "chunked data" in conduit. Thisis actually a very important and common use case, especially aroundByteStrings andTexts. We'll cover it in much more detail in itsown section later.

You can also perform monadic actions while transforming. We've seenmapMC being used already, but other such functions exist:

#!/usr/bin/env stack-- stack script --resolver lts-12.21{-#LANGUAGE OverloadedStrings #-}importConduitevenM::Int->IOBoolevenM i=dolet res=even iprint (i, res)return resmain::IO()main= runConduit$ yieldMany [1..10].| filterMC evenM.| mapM_Cprint

Or you can use theiterM function, which performs a monadic actionon the upstream values without modifying them:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main=do    res<- runConduit$ yieldMany [1..10].| iterMCprint.| sumCprint res

EXERCISE ImplementiterMC in terms ofmapMC.

Monadic composition

We've so far only really explored half of the power of conduit: beingable to combine multiple components together by connecting the outputof the upstream to the input of the downstream (via the.| operatoror thefuse function). However, there's another way to combinesimple conduits into more complex ones, using the standard monadicinterface (ordo-notation). Let's start with some examples,beginning with a data producer:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitsource::Monadm=>ConduitTiIntm()source=do    yieldMany [1..10]    yieldMany [11..20]main::IO()main= runConduit$ source.| mapM_Cprint

We've created a new conduit,source, which combines together twocalls toyieldMany. Try to guess at intuitively what this will dobefore reading the explanation.

As you may have guessed, this program will print the numbers 1through 20. What we've seen here is that, when you use monadiccomposition, the output from the first component is sent downstream,and then the output from the second component is sent downstream. Nowlet's look at the consuming side. Again, try to guess what thisprogram will do before you read the explanation following it.

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitsink::Monadm=>ConduitTIntom (String,Int)sink=do    x<- takeC5.| mapCshow.| foldC    y<- sumCreturn (x, y)main::IO()main=dolet res= runConduitPure$ yieldMany [1..10].| sinkprint res

Let's first analyzetakeC 5 .| mapC show .| foldC. This bit willtake 5 elements from the stream, convert them toStrings, and thencombine thoseStrings into oneString. So if we actually have 10elements on the stream, what happens to the other 5? Well, up untilnow, the answer would have been "disappears into the aether." However,we've now introduced monadic composition. In this world, those valuesare still sitting on the stream, ready to be consumed by whatevercomes next. In our case, that'ssumC.

EXERCISE Rewritesink to not usedo-notation. Hint: it'll beeasier to goApplicative.

So we've seen how monadic composition works with both upstream anddownstream, but in isolation. We can just as easily combine these twoconcepts together, and create a transformer using monadic composition.

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduittrans::Monadm=>ConduitTIntIntm()trans=do    takeC5.| mapC (+1)    mapC (*2)main::IO()main= runConduit$ yieldMany [1..10].| trans.| mapM_Cprint

Here, we've set up a conduit that takes the first 5 values it's given,adds 1 to each, and sends the result downstream. Then, it takeseverything else, multiplies it by 2, and sends it downstream.

EXERCISE Modifytrans so that it does something different forthe first 3, second 3, and final 3 values from upstream, and drops allother values.

The only restriction we have in monadic composition is exactly whatyou'd expect from the types: the first three type parameters (input,output, and monad) must be the same for all components.

Primitives

We've worked with high-level functions in conduit so far. However, atits core conduit is built on top of a number of simpleprimitives. Combined with monadic composition, we can build up all ofthe more advanced functions from these primitives. Let's start withlikely the more expected one:yield. It's just like theyieldManyfunction we've been using until now, except it works in a single valueinstead of a collection of them.

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yield1.| mapM_Cprint

Of course, we're not limited to using just a single call toyield:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ (yield1>> yield2).| mapM_Cprint

EXERCISE ReimplementyieldMany for lists using theyieldprimitive and monadic composition.

Given thatyield sends an output value downstream, we also need afunction to get an input value from upstream. For that, we'll useawait. Let's start really simple:

#!/usr/bin/env stack-- stack script --resolver lts-12.21{-#LANGUAGE ExtendedDefaultRules #-}importConduitmain::IO()main=do-- prints: Just 1print$ runConduitPure$ yield1.| await-- prints: Nothingprint$ runConduitPure$ yieldMany[].| await-- Note, that the above is equivalent to the following. Work out-- why this works:print$ runConduitPure$return().| awaitprint$ runConduitPure await

await will ask for a value from upstream, and return aJust ifthere is a value available. If not, it will return aNothing.

NOTE I was specific in my phrasing of "await will ask." This hasto do with the evaluation of a conduit pipeline, and how it is drivenby downstream. We'll cover this in more detail in the next section.

Of course, things get much more interesting when we combine bothyield andawait together. For example, we can implement our ownmapC function:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmyMapC::Monadm=> (i->o)->ConduitTiom()myMapC f=    loopwhere    loop=do        mx<- awaitcase mxofNothing->return()Just x->do                yield (f x)                loopmain::IO()main= runConduit$ yieldMany [1..10].| myMapC (+1).| mapM_Cprint

EXERCISE Try implementingfilterC andmapMC. For the latter,you'll need to use thelift function.

The next primitive requires a little motivation. Let's look at asimple example of using thetakeWhileC function:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main=print$ runConduitPure$ yieldMany [1..10].|do    x<- takeWhileC (<=5).| sinkList    y<- sinkListreturn (x, y)

As you may guess, this will result in the output([1,2,3,4,5],[6,7,8,9,10]). Awesome. Let's go ahead and try toimplement our owntakeWhileC with justawait andyield.

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmyTakeWhileC::Monadm=> (i->Bool)->ConduitTiim()myTakeWhileC f=    loopwhere    loop=do        mx<- awaitcase mxofNothing->return()Just x| f x->do                    yield x                    loop|otherwise->return()main::IO()main=print$ runConduitPure$ yieldMany [1..10].|do    x<- myTakeWhileC (<=5).| sinkList    y<- sinkListreturn (x, y)

I'd recommend looking overmyTakeWhileC and making sure you'recomfortable with what it's doing. When you've done that, run theprogram and compare the output. To make it easier, I'll put the outputof the original (with the realtakeWhileC) vs this program:

takeWhileC:([1,2,3,4,5],[6,7,8,9,10])myTakeWhileC:([1,2,3,4,5],[7,8,9,10])

What happened to6? Well, in theotherwise branch of the casestatement, we've determined that the value that we received fromupstream does not match our predicate functionf. So what do we dowith it? Well, we just throw it away! In our program, the first valueto fail the predicate is6, so it's discarded, and then our secondsinkList usage grabs thenext value, which is7.

What we need is a primitive that let's us put a value back on thestream. And we have one that does just that:leftover. Let's fix upourmyTakeWhileC:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmyGoodTakeWhileC::Monadm=> (i->Bool)->ConduitTiim()myGoodTakeWhileC f=    loopwhere    loop=do        mx<- awaitcase mxofNothing->return()Just x| f x->do                    yield x                    loop|otherwise-> leftover xmain::IO()main=print$ runConduitPure$ yieldMany [1..10].|do    x<- myGoodTakeWhileC (<=5).| sinkList    y<- sinkListreturn (x, y)

As expected, this has the same output as using the realtakeWhileCfunction.

EXERCISE Implement apeek function that gets the next value fromupstream, if available, and then puts it back on the stream.

We can also callleftover as many times as we want, and even usevalues that didn't come from upstream, though this is a fairly unusualuse case. Just to prove it's possible though:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main=print$ runConduitPure$return().|domapM_ leftover [1..10]    sinkList

There are two semi-advanced concepts to get across in this example:

  1. If you run this, the result is adescending list from 10to 1. This is because usingleftover works in a LIFO (last infirst out) fashion.
  2. If you take off thereturn () .| bit, this example will fail tocompile. That's because, by usingleftover, we've stated that ourconduit actually takes some input from upstream. If you remember,when you userunConduitPure, the complete pipeline cannot beexpected any input (it must have an input of type()). Addingreturn () .| says "we're connecting you to an empty upstreamcomponent" to satisfy the type system.

Evaluation strategy

Let's talk about the evaluation strategy of a conduit pipeline. Themost important thing to remember iseverything is driven bydownstream. To see what I mean, consider this example:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany [1..10].| iterMCprint.|return()

This program will generate no output. The reason is that the mostdownstream component isreturn (), which neverawaits any valuesfrom upstream and immediately exits. Once it exits, the entirepipeline exits. As a result, the two upstream components are never runat all. If you wanted to instead force all of the values and justdiscard them, you could usesinkNull:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany [1..10].| iterMCprint.| sinkNull

Now try and guess what the following program outputs:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany [1..10].| iterMCprint.|return().| sinkNull

Answer: nothing! ThesinkNull willawait for all values from itsimmediate upstream. But its immediate upstream isreturn (), whichneveryields any value, causing thesinkNull to exit immediately.

Alright, let's tweak this slightly: what will this one output:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduit$ yieldMany [1..10].| iterMCprint.| liftIO (putStrLn"I was called").| sinkNull

In this case,sinkNull callsawait, which forces execution todefer to the next upstream component (theliftIO ... bit). In orderto see if ityields, that component must be evaluated until iteither (1) exits, (2)yields, or (3)awaits. We see that it exitsafter callingliftIO, causing the pipeline to terminate, but notbefore it prints its "I was called" message.

There's really not too much to understanding conduit evaluation. Itmostly works the way you'd expect, as long as you remember thatdownstream drives.

Resource allocation

Let's copy a file with conduit:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportqualifiedSystem.IOasIOmain::IO()main=IO.withBinaryFile"input.txt"IO.ReadMode$\inH->IO.withBinaryFile"output.txt"IO.WriteMode$\outH->       runConduit$ sourceHandle inH.| sinkHandle outH

This works nicely, and follows the typical bracket pattern wetypically expect in Haskell. However, it's got some downsides:

  • You have to allocate all of your resources outside of the conduitpipeline. (This is because conduit is coroutine based, andcoroutines/continuations cannot guarantee a cleanup action iscalled.)
  • You will sometimes end up needing to allocate too many resources, orholding onto them for too long, if you allocate them in advanceinstead of on demand.
  • Some control flows are impossible. For example, if you wanted towrite a function to traverse a directory tree, you can't open up allof the directory handles before you enter your conduit pipeline.

One slight improvement we can make is to switch over to thewithSourceFile andwithSinkFile helper functions, which handle thecalls towithBinaryFile for you:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= withSourceFile"input.txt"$\source->       withSinkFile"output.txt"$\sink->       runConduit$ source.| sink

However, this only slightly improves ergonomics; the most of theproblems above remain. To solve those (and some others), conduitprovides built in support for a related package(resourcet), whichallows you to allocate resources and be guaranteed that they will becleaned up. The basic idea is that you'll have a block like:

runResourceT$do    foo    bar    baz

Any resources thatfoo,bar, orbaz allocate have a cleanupfunction registered in a mutable map. When therunResourceT callexits, all of those cleanup functions are called, regardless ofwhether the exiting occurred normally or via an exception.

In order to do this in a conduit, we have the built-in functionbracketP, which takes an allocation function and a cleanup function,and provides you a resource. Putting this all together, we can rewriteour example as:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportqualifiedSystem.IOasIOimportData.ByteString (ByteString)sourceFile'::MonadResourcem=>FilePath->ConduitTiByteStringm()sourceFile' fp=    bracketP (IO.openBinaryFile fpIO.ReadMode)IO.hClose sourceHandlesinkFile'::MonadResourcem=>FilePath->ConduitTByteStringom()sinkFile' fp=    bracketP (IO.openBinaryFile fpIO.WriteMode)IO.hClose sinkHandlemain::IO()main= runResourceT$ runConduit$ sourceFile'"input.txt".| sinkFile'"output.txt"

But that's certainly too tedious. Fortunately, conduit provides thesourceFile andsinkFile functions built in, and defines a helperrunConduitRes which is justrunResourceT . runConduit. Putting allof that together, copying a file becomes absolutely trivial:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduitRes$ sourceFile"input.txt".| sinkFile"output.txt"

Let's get a bit more inventive though. Let's traverse an entiredirectory tree and write the contents of all files with a.hs fileextension into the file "all-haskell-files".

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportSystem.FilePath (takeExtension)main::IO()main= runConduitRes$ sourceDirectoryDeepTrue".".| filterC (\fp-> takeExtension fp==".hs").| awaitForever sourceFile.| sinkFile"all-haskell-files"

What's great about this example is:

  • It guarantees that only two file handles are open at a time: theall-haskell-files destination file and whichever file is beingread from.
  • It will only open as many directory handles as needed to traversethe depth of the file structure.
  • If any exceptions occur, all resources will be cleaned up.

Chunked data

I'd like to read a file, convert all of its characters to upper case,and then write it to standard output. That looks prettystraightforward:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportqualifiedData.TextasTimportData.Char (toUpper)main::IO()main= runConduitRes$ sourceFile"input.txt".| decodeUtf8C.| mapC (T.map toUpper).| encodeUtf8C.| stdoutC

This works just fine, but is inconvenient: isn't thatmapC (T.map ...) repetition just completely jarring? The issue is that instead ofhaving a stream ofChar values, we have a stream ofText values,and ourmapC function will work on theTexts. But ourtoUpperfunction works on theChars inside of theText. We want to useText (orByteString, or sometimesVector) because it's a moreefficient representation of data, but don't want to have to deal withthis overhead.

This is where the chunked functions in conduit come into play. Inaddition to functions that work directly on the values in a stream, wehave functions that work on theelements inside those values. Thesefunctions get aCE suffix instead ofC, and are verystraightforward to use. To see it in action:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportData.Char (toUpper)main::IO()main= runConduitRes$ sourceFile"input.txt".| decodeUtf8C.| omapCE toUpper.| encodeUtf8C.| stdoutC

NOTE We also had to prependo to get the monomorphic mappingfunction, sinceText is a monomorphic container.

We can use this for other things too. For example, let's get just thefirst line of content:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportData.Char (toUpper)main::IO()main= runConduitRes$ sourceFile"input.txt".| decodeUtf8C.| takeWhileCE (/='\n').| encodeUtf8C.| stdoutC

Or just the first 5 bytes:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduitRes$ sourceFile"input.txt".| takeCE5.| stdoutC

There are many other functions available for working on chunkeddata. In fact, most non-chunked functions have a chunkedequivalent. This means that most of the intuition you've built up forworking with streams of values will automatically translate to dealingwith chunked streams, a big win for binary and textual processing.

EXERCISE Try to implement thetakeCE function onByteStrings. Hint: you'll need to useleftover to make it workcorrectly!

ZipSink

So far we've had very linear pipelines: a component feeds into exactlyone downstream component, and so on. However, sometimes we may wish toallow for multiple consumers of a single stream. As a motivatingexample, let's consider taking the average of a stream ofDoubles. In the list world, this may look like:

#!/usr/bin/env stack-- stack script --resolver lts-12.21doubles:: [Double]doubles= [1,2,3,4,5,6]average:: [Double]->Doubleaverage xs=sum xs/fromIntegral (length xs)main::IO()main=print$ average doubles

However, performance aficionados will quickly point out that this hasa space leak: the list will be traversed once for thesum, kept inmemory, and then traversed a second time for thelength. We couldwork around that by using lower-level functions, but we losecomposability. (Though see thefoldl package for composablefolding.)

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitdoubles:: [Double]doubles= [1,2,3,4,5,6]average::Monadm=>ConduitTDoubleVoidmDoubleaverage=    getZipSink (go<$>ZipSink sumC<*>ZipSink lengthC)where    go total len= total/fromIntegral lenmain::IO()main=print$ runConduitPure$ yieldMany doubles.| average

ZipSink is a newtype wrapper which provides an differentApplicative instance than the standard one forConduitT. Insteadof sequencing the consumption of a stream, it allows two components toconsumein parallel. Now, oursumC andlengthC are gettingvalues at the same time, and then those values can be immediatelythrown away. This leads to easy composition and constant memory usage.

NOTE Both the list and conduit versions of this are subject to adivide-by-zero error. You'd probably in practice want to makeaverage return aMaybe Double.

Another real world example ofZipSink is when you want to bothconsume a file and calculate its cryptographic hash. Working with thecryptonite andcryptonite-conduit libraries:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitimportCrypto.Hash.Conduit (sinkHash)importCrypto.Hash (Digest,SHA256)main::IO()main=do    digest<- runConduitRes$ sourceFile"input.txt".| getZipSink (ZipSink (sinkFile"output.txt")*>ZipSink sinkHash)print (digest::DigestSHA256)

Or we can get slightly more inventive, and read from an HTTP connection instead of a local file:

#!/usr/bin/env stack-- stack script --resolver lts-12.21{-#LANGUAGE OverloadedStrings #-}importConduitimportCrypto.Hash.Conduit (sinkHash)importCrypto.Hash (Digest,SHA256)importNetwork.HTTP.Simple (httpSink)main::IO()main=do    digest<- runResourceT$ httpSink"http://httpbin.org"              (\_res-> getZipSink (ZipSink (sinkFile"output.txt")*>ZipSink sinkHash))print (digest::DigestSHA256)

This provides a convenient and efficient method to consume data over anetwork connection.

ZipSource

Let's keep a good thing going. In addition to consuming in parallel,we may wish to produce in parallel. For this, we'll use theZipSource newtype wrapper, which is very similar in concept to theZipList wrapper for those familiar. As a simple example, let'screate a stream of the Fibonacci numbers, together with each one'sindex in the sequence:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitfibs:: [Int]fibs=0:1:zipWith(+) fibs (drop1 fibs)indexedFibs::ConduitT() (Int,Int)IO()indexedFibs= getZipSource$ (,)<$>ZipSource (yieldMany [1..])<*>ZipSource (yieldMany fibs)main::IO()main= runConduit$ indexedFibs.| takeC10.| mapM_Cprint

ZipConduit

To round out the collection of newtype wrappers, we've gotZipConduit, which is certainly the most complicated of the bunch. Itallows you to combine a bunch of transformers in such a way that:

  • Drain all of theZipConduits of allyielded values, until theyare allawaiting
  • Grab the next value from upstream, and feed it to all of theZipConduits
  • Repeat

Here's a silly example of using it, which demonstrates its most commonuse case: focusing in on a subset of a stream. We split a stream ofnumbers into evens (Left) and odds (Right). Then we have twotransformers that each look at only half the stream, and combine thosetwo transformers together into a single transformer that looks at thewhole stream:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduittagger::Monadm=>ConduitTInt (EitherIntInt)m()tagger= mapC$\i->ifeven ithenLeft ielseRight ievens,odds::Monadm=>ConduitTIntStringm()evens= mapC$\i->"Even number:"++show iodds= mapC$\i->"Odd  number:"++show ileft::Eitherlr->Maybelleft=eitherJust (constNothing)right::Eitherlr->Mayberright=either (constNothing)Justinside::Monadm=>ConduitT (EitherIntInt)Stringm()inside= getZipConduit$ZipConduit (concatMapC left.| evens)*>ZipConduit (concatMapC right.| odds)main::IO()main= runConduit$ enumFromToC110.| tagger.| inside.| mapM_CputStrLn

In my experience, the most useful of the three newtype wrappers isZipSink, but your mileage may vary.

Forced consumption

Remember that, in our evaluation method for conduit, we stopprocessing as soon as downstream stops. There are some cases wherethis is problematic, specifically when we want to ensure a specificamount of data is consumed. Consider:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitwithFiveSum::Monadm=>ConduitTIntomr->ConduitTIntom (r,Int)withFiveSum inner=do    r<- takeC5.| inner    s<- sumCreturn (r, s)main::IO()main=print$ runConduitPure$ yieldMany [1..10].| withFiveSum sinkList

OurwithFiveSum function will let the providedinner conduit workon the first five values in the stream, then take the sum of therest. All seems well, but now consider if we replacesinkList withreturn (). OurtakeC 5 .| return () will no longer consume any ofthe first five values, andsumC will end up consumingthem. Depending on your use case, this could be problematic, and verysurprising.

We can work around this by forcing all other values to be dropped,e.g.:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitwithFiveSum::Monadm=>ConduitTIntomr->ConduitTIntom (r,Int)withFiveSum inner=do    r<- takeC5.|do        r<- inner        sinkNullreturn r    s<- sumCreturn (r, s)main::IO()main=print$ runConduitPure$ yieldMany [1..10].| withFiveSum (return())

However, there's also a convenience function which captures thispattern:takeExactlyC:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitwithFiveSum::Monadm=>ConduitTIntomr->ConduitTIntom (r,Int)withFiveSum inner=do    r<- takeExactlyC5 inner    s<- sumCreturn (r, s)main::IO()main=print$ runConduitPure$ yieldMany [1..10].| withFiveSum (return())

Notice that there's no.| operator betweentakeExactlyC 5 andinner. That's not a typo!takeExactlyC isn't actually a conduit,it's acombinator which, when given a conduit, will generate aconduit.

EXERCISE Try to writetakeExactlyC as a conduit itself, and/orconvince yourself why that's impossible.

This same kind of pattern is used to deal with the stream-of-streamsproblem. As a motivating example, consider processing a file, andwanting to work on it one line at a time. One possibility is to simplybreak the stream into oneText per line, but this can be dangerousif your input is untrusted and may contain an unbounded linelength. Instead, we can just do:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduitRes$ sourceFile"input.txt".| decodeUtf8C.|do    len<- lineC lengthCE    liftIO$print len

This program will print out the length of the first line of the inputfile. However, by combining with thepeekForeverE combinator - whichwill continuously run a conduit as long as there is some inputavailable in a chunked stream - we can print out the length of eachline:

#!/usr/bin/env stack-- stack script --resolver lts-12.21importConduitmain::IO()main= runConduitRes$ sourceFile"input.txt".| decodeUtf8C.| peekForeverE (do    len<- lineC lengthCE    liftIO$print len)

FAQs

  • How do you deal with an upstream conduit that has a return value?The special fusion functions for it, seethe haddocks.
  • How do you capture unconsumed leftover values? Again, the specialfusion functions for it, seethe haddocks.
  • How do I run a source, take some of its output, and then run therest of it later?Connect and resume

More exercises

Write a conduit that consumes a stream ofInts. It takes the firstInt from the stream, and then multiplies all subsequentInts bythat number and sends them back downstream. You should use themapCfunction for this.

Take a file and, for each line, print out the number of bytes in theline (try using bytestring directly and then conduit).

Further exercises wanted, please feel free to send PRs!

Legacy syntax

As of version 1.2.8 of conduit, released September 2016, the above usedoperators and function names are recommended. However, prior to that, analternate set of functions and operators was used instead. You may still findcode and documentation out there which follows the legacy syntax, so it's worthbeing aware of it. Basically:

  • Instead of.|, we had three operators:$=,=$, and=$=. These wereall synonyms, and existed for historical reasons.
  • The$$ operator is a combination ofrunConduit and.|.

To put it simply in code:

x$=  y= x.| yx=$  y= x.| yx=$= y= x.| yx$$  y= runConduit (x.| y)

If the old operators seem needlessly confusing/redundant... well, that's why wehave new operators :).

Prior to the 1.3.0 release in February 2018, there were different datatypes and type synonyms available. In particular, instead ofConduitT, we hadConduitM, and we also had the following synonyms:

typeSourcemo=ConduitM()om()typeSinkimr=ConduitMiVoidmrtypeConduitimo=ConduitMiom()typeProducermo=foralli.ConduitMiom()typeConsumerimr=forallo.ConduitMiomr

These older names are all still available, but they've been deprecatedto simplify the package.

Further reading

Some blogs posts making heavy usage of conduit:

If you have other articles to include, please send a PR!


[8]ページ先頭

©2009-2025 Movatter.jp