Hi! Welcome, if you’re joining us from the great Advent of Haskell 2020 event! Feel free to grab a hot chocolate and sit back by the fireplace. I’m honored to be able to be a part of the event this year; it’s a great initiative and harkens back to the age-old Haskell tradition of bite-sized Functional Programming “advent calendars”. I remember when I was first learning Haskell, Ollie Charles’ 24 Days of Hackage series was one of my favorite series that helped me really get into the exciting world of Haskell and the all the doors that functional programming can open.
All of the posts this year have been great — they range from insightful reflections on the nature of Haskell and programming in Haskell, or also on specific language features. This post is going to be one of the “project-based” ones, where we walk through and introduce a solidly intermediate Haskell technique as it applies to building a useful general toolset. I’m going to be exploring the “functor combinator style” where you identify the interface you want, associate it with a common Haskell typeclass, pick your primitives, and automatically get the ability to imbue your primitives with the structure you need. I’ve talked about this previously with:
- Applicative regular expressions
- The functor combinatorpedia
- Bidirectional serializers
- Composable interpreters
and I wanted to share a recent application I have been able to use apply it with where just thinking about the primitives gave me almost all the functionality I needed for a type: composable streaming combinators. This specific application is also very applicable to integrate into any composable effects system, since it’s essentially a monadic interface.
In a way, this post could also be seen as capturing the spirit of the holidays by reminiscing about the days of yore — looking back at one of the more exciting times in modern Haskell’s development, where competing composable streaming libraries were at the forefront of practical innovation. The dust has settled on that a bit, but it every time I think about composable streaming combinators, I do get a bit nostalgic :)
This post is written for an intermediate Haskell audience, and will assume you have a familiarity with monads and monadic interfaces, and also a little bit of experience with monad transformers. Note — there are many ways to arrive at the same result, but this post is more of a demonstration of a certain style and approach that has benefited my greatly in the past.
All of the code in this page can be found online at github!
Dreaming of an Effectful Christmas
The goal here is to make a system of composable pipes that are “pull-based”, so we can process data as it is read in from IO only as we need it, and never do more work than we need to do up-front or leak memory when we stop using it.
So, the way I usually approach things like these is: “dress for the interface you want, not the one you have.” It involves:
- Thinking of the
m ayou want and how you would want to combine it/use it.
- Express the primitive actions of that thing
- Use some sort of free structure or effects system to enhance that primitive with the interface you are looking for.
So, let’s imagine our type!
Pipe i o m a represents a pipe component where:
i: the type of the input the pipe expects from upstream
o: the type of the output the pipe will be yielding upstream
m: the monad that the underlying actions live in
a: the overall result of the pipe once it has terminated.
One nice thing about this setup is that by picking different values for the type parameters, we can already get a nice classification for interesting subtypes:
()(or universally quantified1) — a
Pipe () o m a— it means that the pipe doesn’t ever expect any sort of information upstream, and so can be considered a “source” that keeps on churning out values.
Void(or universally quantified) — a
Pipe i Void m a— it means that the pipe will never yield anything downstream, because
Voidhas no inhabitants that could possibly be yielded.
This means that it acts like a “sink” that will keep on eating
ivalues without ever outputting anything downstream.
Void(or they are both universally quantified), then the pipe doesn’t expect any sort of information upstream, and also won’t ever yield anything downstream… a
Pipe () Void m ais just an
m a! In the biz, we often call this an “effect”.
Void(or universally quantified) — a
Pipe i o m Void— it means that the pipe will never terminate, since
Voidhas no inhabitants that could it could possibly produce upon termination.
To me, I think it embodies a lot of the nice principles about the “algebra” of types that can be used to reason with inputs and outputs. Plus, it allows us to unify sources, sinks, and non-terminating pipes all in one type!
Now let’s think of the interface we want. We want to be able to:
-- | Yield a value `o` downstream yield :: o -> Pipe i o m () -- | Await a value `i` upstream await :: Pipe i o m (Maybe i) -- | Terminate immediately with a result value return :: a -> Pipe i o m a -- | Sequence pipes one-after-another: -- "do this until it terminates, then that one next" (>>) :: Pipe i o m a -> Pipe i o m b -> Pipe i o m b -- | In fact let's just make it a full fledged monad, why not? We're designing -- our dream interface here. (>>=) :: Pipe i o m a -> (a -> Pipe i o m b) -> Pipe i o m b -- | A pipe that simply does action in the underlying monad and terminates with -- the result lift :: m a -> Pipe i o m a -- | Compose pipes, linking the output of one to the input of the other (.|) :: Pipe i j m a -> Pipe j o m b -> Pipe i o m b -- | Finally: run it all on a pipe expecting no input and never yielding: runPipe :: Pipe () Void m a -> m a
This looks like a complicated list…but actually most of these come from ubiquitous Haskell typeclasses like
Applicative. We’ll see how this comes into play later, when we learn how to get these instances for our types for free. This makes the actual “work” we have to do very small.
So, these are going to be implementing “conduit-style” streaming combinators, where streaming actions are monadic, and monadic sequencing represents “do this after this one is done.” Because of this property, they work well as pull-based pipes: yields will block until a corresponding await can accept what is yielded.
Put on those Christmas Sweaters
“Dress for the interface you want, not the one you have”. So let’s pretend we already implemented this interface…what could we do with it?
Well, can write simple sources like “yield the contents from a file line-by-line”:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L65-L72 sourceHandleIO :: Handle -> Pipe i String IO () sourceHandleIO handle = do res <- lift $ tryJust (guard . isEOFError) (hGetLine handle) case res of Left _ -> return () Right out -> do yield out sourceHandle handle
Note that because the
i is universally quantified, it means that we know that
sourceFile never ever awaits or touches any input: it’s purely a source.
We can even write a simple sink, like “await and print the results to stdout as they come”:
And maybe we can write a pipe that takes input strings and converts them to all capital letters and re-yields them:
And we can maybe write a pipe that stops as soon as it reads the line
untilSTOP is really sort of the crux of what makes these streaming systems useful: we only pull items from the file as we need it, and
untilSTOP will stop pulling anything as soon as we hit
STOP, so no IO will happen anymore if the upstream sink does IO.
Our Ideal Program
Now ideally, we’d want to write a program that lets us compose the above pipes to read from a file and output its contents to stdout, until it sees a STOP line:
Setting up our Stockings
Step 2 of our plan was to identify the primitive actions we want. Looking at our interface, it seems like the few things that let us “create” a
Pipe from scratch (instead of combining existing ones) are:
However, we can note that
return can be gained just from having a
MonadTrans instance. So let’s assume we have those instances.
The functor combinator plan is to identify your primitives, and let free structures give you the instances (in our case,
MonadTrans) you need for them.
So this means we only need two primitives:
await. Then we just throw them into some machinery that gives us a free
MonadTrans structure, and we’re golden :)
In the style of the free library, we’d write base functions to get an ADT that describes the primitive actions:
The general structure of the base functor style is to represent each primitive as a constructor: include any inputs, and then a continuation on what to do if you had the result.
YieldF, you need an
oto be able to yield. The second field should really be the continuation
() -> a, since the result is
(), but that’s equivalent to
AwaitF, you don’t need any parameters to await, but the continuation is
Maybe i -> asince you need to specify how to handle the
(This is specifically the structure that free expects, but this principle can be ported to any algebraic effects system.)
A Christmas Surprise
And now for the last ingredient: we can use the
FreeT type from Control.Monad.Trans.Free, and now we have our pipe interface, with a
This takes our base functor and imbues it with a full
That’s the essence of the free structure: it adds to our base functor (
PipeF) exactly the structure it needs to be able to implement the instances it is free on. And it’s all free as in beer! :D
As a bonus gift, we also get a
MonadIO instance from
FreeT, as well:
Now we just need our functions to lift our primitives to
liftF :: f a -> FreeT f m a:
(these things you can usually just fill in using type tetris, filling in values with typed holes into they typecheck).
Note that all of the individual pipes we had planned work as-is! And we can even even make
sinkStdout work for any
MonadIO m => Pipe i o m a, because of the unexpected surprise Christmas gift we got (the
MonadIO instance and
liftIO :: MonadIO m => IO a -> Pipe i o u m a). Remember,
MonadIO m is basically any
m that supports doing IO.
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L74-L119 sourceHandle :: MonadIO m => Handle -> Pipe i String m () sourceHandle handle = do res <- liftIO $ tryJust (guard . isEOFError) (hGetLine handle) case res of Left _ -> return () Right out -> do yield out sourceHandle handle sinkStdout :: MonadIO m => Pipe String o m () sinkStdout = do inp <- await case inp of Nothing -> pure () Just x -> do liftIO $ putStrLn x sinkStdout toUpperPipe :: Monad m => Pipe String String m () toUpperPipe = do inp <- await case inp of Nothing -> pure () Just x -> do yield (map toUpper x) toUpperPipe untilSTOP :: Monad m => Pipe String String m () untilSTOP = do inp <- await case inp of Nothing -> pure () Just x | x == "STOP" -> pure () | otherwise -> do yield x untilSTOP
That’s because using
FreeT, we imbue the structure required to do monadic chaining (do notation) and MonadTrans (
lift) and MonadIO (
liftIO) for free!
To “run” our pipes, we can use
FreeT’s “interpreter” function. This follows the same pattern as for many free structures: specify how to handle each individual base functor constructor, and it then gives you a handler to handle the entire thing.
So let’s write our base functor handler. Remember that we established earlier we can only “run” a
Pipe () Void m a: that is, pipes where
await can always be fed with no information (
()) and no
yield is ever called (because you cannot yield with
Void, a type with no inhabitants). We can directly translate this to how we handle each constructor:
And so we get our full
I think this process exemplifies most of the major beats when working with free structures:
- Define the base functor
- Allow the free structure to imbue the proper structure over your base functor
- Write your interpreter to interpret the constructors of your base functor, and the free structure will give you a way to interpret the entire structure.
The Final Ornament
If you look at the list of all the things we wanted, we’re still missing one thing: pipe composition/input-output chaining. That’s because it isn’t a primitive operation (like yield or await), and it wasn’t given to us for free by our free structure (
FreeT, which gave us monadic composition and monad transformer ability). So with how we have currently written it, there isn’t any way of getting around writing
(.|) manually. So let’s roll up our sleeves and do the (admittedly minimal amount of) dirty work.
Let’s think about the semantics of our pipe chaining. We want to never do more work than we need to do, so we’ll be “pull-based”: for
f .| g, try running
g as much as possible until it awaits anything from
f. Only then do we try doing
To implement this, we’re going to have to dig in a little bit to the implementation/structure of
This does look a little complicated, and on the face of it, it can be a bit intimidating. And why is there a second internal data type?
Well, you can think of
FreeF f a b as being a fancy version of
Either a (f b). And the implementation of
FreeT is saying that
FreeT f m a is an m-action that produces
Either a (FreeT f m a). So for example,
FreeT f IO a is an IO action that produces either the
a (we’re done, end here!) or a
f (FreeT f m a)) (we have to handle an
At the top level,
FreeT is an action in the underlying monad (just like
StateT, etc.). Let’s take that into account and write our implementation (with a hefty bit of help from the typechecker and typed holes)! Remember our plan: for
f .| g, start unrolling
g until it needs anything, and then ask
f when it does.
(.|) :: Monad m => Pipe a b m x -- ^ pipe from a -> b -> Pipe b c m y -- ^ pipe from b -> c -> Pipe a c m y -- ^ pipe from a -> c pf .| pg = do gRes <- lift $ runFreeT pg -- 1 case gRes of Pure x -> pure x -- 2 Free (YieldF o x) -> do -- 3 yield o pf .| x Free (AwaitF g ) -> do -- 4 fRes <- lift $ runFreeT pf case fRes of Pure _ -> pure () .| g Nothing -- 5 Free (YieldF o y) -> y .| g (Just o) -- 6 Free (AwaitF f ) -> do -- 7 i <- await f i .| FreeT (pure gRes)
Here are some numbered notes and comments:
- Start unrolling the downstream pipe
pg, in the underlying monad
Pure x, it means we’re done pulling anything. The entire pipe has terminated, since we will never need anything again. So just quit out with
Free (YieldF o x), it means it’s yielding an
oand continuing on with
x. So let’s just yield that
oand move on to the composition of
pfwith the next pipe
Free (AwaitF g), now things get interesting. We need to unroll
pfuntil it yields some
Maybe b, and feed that to
g :: Maybe b -> Pipe b c m y.
Pure y, that means it was done! The upstream terminated, so the downstream will have to terminate as well. So
Nothing, and we move from there. Note we have to compose with a dummy pipe
pure ()to make the types match up properly.
YieldF o y, then we have found our match! So give
g (Just o), and now we recursively compose the next pipe (
y) with the that
AwaitF f, then we’re in a bind, aren’t we? We now have two layers waiting for something further upstream. So, we await from even further upstream; when we get it, we feed it to
fand then compose
f i :: Pipe a b m xwith
pg’s result (wrapping up
gResback into a
Pipeso the types match up).
Admittedly (!) this is the “ugly” part of this derivation: sometimes we just can’t get everything for free. But getting the Monad, Applicative, Functor, MonadTrans, etc. instances is probably nice enough to justify this inconvenience :) And who knows, there might be a free structure that I don’t know about that gives us all of these plus piping for free.
$ cat testpipefile.txt hello world STOP okay goodbye
Smooth as silk :D
Takeways for a Happy New Year
Most of this post was thought up when I needed2 a tool that was sort of like conduit, sort of like pipes, sort of like the other libraries…and I thought I had to read up on the theory of pipes and iteratees and trampolines and fancy pants math stuff to be able to make anything useful in this space. I remember being very discouraged when I read about this stuff as a wee new Haskeller, because the techniques seemed so foreign and out of the range of my normal Haskell experience.
However, I found a way to maintain a level head somehow, and just thought — “ok, I just need a monad (trans) with two primitive actions: await, and yield. Why don’t I just make an await and yield and get automatic
MonadTrans instances with the appropriate free structure?”
As we can see…this works just fine! We only needed to implement one extra thing (
.|) to get the interface of our dreams. Of course, for a real industrial-strength streaming combinator library, we might need to be a bit more careful. But for my learning experience and use case, it worked perfectly.
The next time you need to make some monad that might seem exotic, try this out and see if it works for you :)
Happy holidays, and merry Christmas!
Click on the links in the corner of the text boxes for solutions! (or just check out the source file)
Pipe i o m a“takes”
o, so it should make sense to make pre-map and post-map functions:
That pre-maps all inputs the pipe would receive, and post-maps all of the values it yields.
Hint: This actually is made a lot simpler to write with the handy
transFreeTcombinator, which lets you swap out/change the base functor:
We could then write pre-map and post-map function on
PipeFand translate them to
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L140-L152 postMapF :: (o -> o') -> PipeF i o a -> PipeF i o' a preMapF :: (i' -> i) -> PipeF i o a -> PipeF i' o a postMap :: Monad m => (o -> o') -> Pipe i o m a -> Pipe i o' m a postMap f = transFreeT (postMapF f) preMap :: Monad m => (i' -> i) -> Pipe i o m a -> Pipe i' o m a preMap f = transFreeT (preMapF f)
One staple of a streaming combinator system is giving you a disciplined way to handle resources allocations like file handlers and properly close them on completion. Our streaming combinator system has no inherent way of doing this within its structure, but we can take advantage of the resourcet package to handle it for us.
Basically, if we run our pipes over
ResourceT IOinstead of normal
IO, we get an extra action
We can use this in our pipe to open a handler from a filename, and rest assured that the file handler will be closed when we eventually
runResourceT :: ResourceT IO a -> IO aour pipe.
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L155-L165 sourceFile :: MonadIO m => FilePath -> Pipe i String (ResourceT m) () samplePipe2 :: FilePath -> Pipe i o (ResourceT IO) () samplePipe2 fp = sourceFile fp .| untilSTOP .| toUpperPipe .| hoistFreeT lift sinkStdout
Let’s say we modified our
PipeFslightly to take another parameter
u, the result type of the upstream pipe.
awaitwould be fed
ithings yielded from upstream, but sometimes you’d get a
Leftindicating that the upstream pipe has terminated.
What would be the implications if
What could you do in a
CertainPipe i o m athat you couldn’t normally do with our
Pipe i o m a?
We mentioned earlier that a “source” could have type
Source o m awould be something that keeps on pumping out
os as much as we need, without requiring any upstream input.
This is actually the essential behavior of the (true) list monad transformer, as esposed by the list-transformer package.
In that package,
ListTis defined as:
And it’s a type that can yield out new
as on-demand, until exhausted.
Source o m ()is equivalent to
ListT m o. Write the functions to convert between them! :D
Unfortunately we cannot use
iterTbecause the last type parameter of each is different. But manual pattern matching (like how we wrote
(.|)) isn’t too bad!
The semantics of
ListTapi is that
x <|> ywill “do” (and emit the result)
xbefore moving on to what
ywould emit. And
ListTthat signals it is done producing.
ListTare roughly analogous to
I am very humbled to be supported by an amazing community, who make it possible for me to devote time to researching and writing these posts. Very special thanks to my supporter at the “Amazing” level on patreon, Josh Vera! :)