Roll your own Holly Jolly streaming combinators with Free
by Justin Le ♦
Hi! Welcome, if you’re joining us from the great Advent of Haskell 2020 event! Feel free to grab a hot chocolate and sit back by the fireplace. I’m honored to be able to be a part of the event this year; it’s a great initiative and harkens back to the age-old Haskell tradition of bite-sized Functional Programming “advent calendars”. I remember when I was first learning Haskell, Ollie Charles’ 24 Days of Hackage series was one of my favorite series that helped me really get into the exciting world of Haskell and the all the doors that functional programming can open.
All of the posts this year have been great — they range from insightful reflections on the nature of Haskell and programming in Haskell, or also on specific language features. This post is going to be one of the “project-based” ones, where we walk through and introduce a solidly intermediate Haskell technique as it applies to building a useful general toolset. I’m going to be exploring the “functor combinator style” where you identify the interface you want, associate it with a common Haskell typeclass, pick your primitives, and automatically get the ability to imbue your primitives with the structure you need. I’ve talked about this previously with:
- Applicative regular expressions
- The functor combinatorpedia
- Bidirectional serializers
- Composable interpreters
and I wanted to share a recent application I have been able to use apply it with where just thinking about the primitives gave me almost all the functionality I needed for a type: composable streaming combinators. This specific application is also very applicable to integrate into any composable effects system, since it’s essentially a monadic interface.
In a way, this post could also be seen as capturing the spirit of the holidays by reminiscing about the days of yore — looking back at one of the more exciting times in modern Haskell’s development, where competing composable streaming libraries were at the forefront of practical innovation. The dust has settled on that a bit, but it every time I think about composable streaming combinators, I do get a bit nostalgic :)
This post is written for an intermediate Haskell audience, and will assume you have a familiarity with monads and monadic interfaces, and also a little bit of experience with monad transformers. Note — there are many ways to arrive at the same result, but this post is more of a demonstration of a certain style and approach that has benefited my greatly in the past.
All of the code in this page can be found online at github!
Dreaming of an Effectful Christmas
The goal here is to make a system of composable pipes that are “pull-based”, so we can process data as it is read in from IO only as we need it, and never do more work than we need to do up-front or leak memory when we stop using it.
So, the way I usually approach things like these is: “dress for the interface you want, not the one you have.” It involves:
- Thinking of the
m a
you want and how you would want to combine it/use it. - Express the primitive actions of that thing
- Use some sort of free structure or effects system to enhance that primitive with the interface you are looking for.
So, let’s imagine our type!
type Pipe i o m a = ...
where a Pipe i o m a
represents a pipe component where:
i
: the type of the input the pipe expects from upstreamo
: the type of the output the pipe will be yielding upstreamm
: the monad that the underlying actions live ina
: the overall result of the pipe once it has terminated.
One nice thing about this setup is that by picking different values for the type parameters, we can already get a nice classification for interesting subtypes:
If
i
is()
(or universally quantified1) — aPipe () o m a
— it means that the pipe doesn’t ever expect any sort of information upstream, and so can be considered a “source” that keeps on churning out values.If
o
isVoid
(or universally quantified) — aPipe i Void m a
— it means that the pipe will never yield anything downstream, becauseVoid
has no inhabitants that could possibly be yielded.data Void
This means that it acts like a “sink” that will keep on eating
i
values without ever outputting anything downstream.If
i
is()
ando
isVoid
(or they are both universally quantified), then the pipe doesn’t expect any sort of information upstream, and also won’t ever yield anything downstream… aPipe () Void m a
is just anm a
! In the biz, we often call this an “effect”.If
a
isVoid
(or universally quantified) — aPipe i o m Void
— it means that the pipe will never terminate, sinceVoid
has no inhabitants that could it could possibly produce upon termination.
To me, I think it embodies a lot of the nice principles about the “algebra” of types that can be used to reason with inputs and outputs. Plus, it allows us to unify sources, sinks, and non-terminating pipes all in one type!
Now let’s think of the interface we want. We want to be able to:
-- | Yield a value `o` downstream
yield :: o -> Pipe i o m ()
-- | Await a value `i` upstream
await :: Pipe i o m (Maybe i)
-- | Terminate immediately with a result value
return :: a -> Pipe i o m a
-- | Sequence pipes one-after-another:
-- "do this until it terminates, then that one next"
(>>) :: Pipe i o m a -> Pipe i o m b -> Pipe i o m b
-- | In fact let's just make it a full fledged monad, why not? We're designing
-- our dream interface here.
(>>=) :: Pipe i o m a -> (a -> Pipe i o m b) -> Pipe i o m b
-- | A pipe that simply does action in the underlying monad and terminates with
-- the result
lift :: m a -> Pipe i o m a
-- | Compose pipes, linking the output of one to the input of the other
(.|) :: Pipe i j m a -> Pipe j o m b -> Pipe i o m b
-- | Finally: run it all on a pipe expecting no input and never yielding:
runPipe :: Pipe () Void m a -> m a
This looks like a complicated list…but actually most of these come from ubiquitous Haskell typeclasses like Monad
and Applicative
. We’ll see how this comes into play later, when we learn how to get these instances for our types for free. This makes the actual “work” we have to do very small.
So, these are going to be implementing “conduit-style” streaming combinators, where streaming actions are monadic, and monadic sequencing represents “do this after this one is done.” Because of this property, they work well as pull-based pipes: yields will block until a corresponding await can accept what is yielded.
Put on those Christmas Sweaters
“Dress for the interface you want, not the one you have”. So let’s pretend we already implemented this interface…what could we do with it?
Well, can write simple sources like “yield the contents from a file line-by-line”:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L65-L72
sourceHandleIO :: Handle -> Pipe i String IO ()
= do
sourceHandleIO handle <- lift $ tryJust (guard . isEOFError) (hGetLine handle)
res case res of
Left _ -> return ()
Right out -> do
yield out sourceHandle handle
Note that because the i
is universally quantified, it means that we know that sourceFile
never ever awaits or touches any input: it’s purely a source.
We can even write a simple sink, like “await and print the results to stdout as they come”:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L83-L90
sinkStdoutIO :: Pipe String o IO ()
= do
sinkStdoutIO <- await
inp case inp of
Nothing -> pure ()
Just x -> do
$ putStrLn x
lift sinkStdout
And maybe we can write a pipe that takes input strings and converts them to all capital letters and re-yields them:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L101-L108
toUpperPipe :: Monad m => Pipe String String m ()
= do
toUpperPipe <- await
inp case inp of
Nothing -> pure ()
Just x -> do
map toUpper x)
yield ( toUpperPipe
And we can maybe write a pipe that stops as soon as it reads the line STOP
.
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L110-L119
untilSTOP :: Monad m => Pipe String String m ()
= do
untilSTOP <- await
inp case inp of
Nothing -> pure ()
Just x
| x == "STOP" -> pure ()
| otherwise -> do
yield x untilSTOP
untilSTOP
is really sort of the crux of what makes these streaming systems useful: we only pull items from the file as we need it, and untilSTOP
will stop pulling anything as soon as we hit STOP
, so no IO will happen anymore if the upstream sink does IO.
Our Ideal Program
Now ideally, we’d want to write a program that lets us compose the above pipes to read from a file and output its contents to stdout, until it sees a STOP line:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L121-L126
samplePipeIO :: Handle -> Pipe i o IO ()
=
samplePipeIO handle
sourceHandleIO handle.| untilSTOP
.| toUpperPipe
.| sinkStdoutIO
Setting up our Stockings
Step 2 of our plan was to identify the primitive actions we want. Looking at our interface, it seems like the few things that let us “create” a Pipe
from scratch (instead of combining existing ones) are:
yield :: o -> Pipe i o m ()
await :: Pipe i o m (Maybe i)
lift :: m a -> Pipe i o m a
return :: a -> Pipe i o m a
However, we can note that lift
and return
can be gained just from having a Monad
and MonadTrans
instance. So let’s assume we have those instances.
class Monad m where
return :: a -> m a
class MonadTrans p where
lift :: m a -> p m a
The functor combinator plan is to identify your primitives, and let free structures give you the instances (in our case, Monad
and MonadTrans
) you need for them.
So this means we only need two primitives: yield
and await
. Then we just throw them into some machinery that gives us a free Monad
and MonadTrans
structure, and we’re golden :)
In the style of the free library, we’d write base functions to get an ADT that describes the primitive actions:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L22-L25
data PipeF i o a =
YieldF o a
| AwaitF (Maybe i -> a)
deriving Functor
The general structure of the base functor style is to represent each primitive as a constructor: include any inputs, and then a continuation on what to do if you had the result.
For example:
- For
YieldF
, you need ano
to be able to yield. The second field should really be the continuation() -> a
, since the result is()
, but that’s equivalent toa
in Haskell. - For
AwaitF
, you don’t need any parameters to await, but the continuation isMaybe i -> a
since you need to specify how to handle theMaybe i
result.
(This is specifically the structure that free expects, but this principle can be ported to any algebraic effects system.)
A Christmas Surprise
And now for the last ingredient: we can use the FreeT
type from Control.Monad.Trans.Free, and now we have our pipe interface, with a Monad
and MonadTrans
instance!
type Pipe i o = FreeT (PipeF i o)
This takes our base functor and imbues it with a full Monad
and MonadTrans
instance:
lift :: m a -> FreeT (PipeF i o) m a
lift :: m a -> Pipe i o m a
return :: a -> FreeT (PipeF i o) m a
return :: a -> Pipe i o m a
(>>) :: Pipe i o m a -> Pipe i o m b -> Pipe i o m b
(>>=) :: Pipe i o m a -> (a -> Pipe i o m b) -> Pipe i o m b
That’s the essence of the free structure: it adds to our base functor (PipeF
) exactly the structure it needs to be able to implement the instances it is free on. And it’s all free as in beer! :D
As a bonus gift, we also get a MonadIO
instance from FreeT
, as well:
liftIO :: MonadIO m => IO a -> FreeT (PipeF i o) m a
liftIO :: MonadIO m => IO a -> Pipe i o m a
Now we just need our functions to lift our primitives to Pipe
, using liftF :: f a -> FreeT f m a
:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L29-L33
yield :: Monad m => o -> Pipe i o m ()
= liftF $ YieldF x ()
yield x
await :: Monad m => Pipe i o m (Maybe i)
= liftF $ AwaitF id await
(these things you can usually just fill in using type tetris, filling in values with typed holes into they typecheck).
Note that all of the individual pipes we had planned work as-is! And we can even even make sourceHandle
and sinkStdout
work for any MonadIO m => Pipe i o m a
, because of the unexpected surprise Christmas gift we got (the MonadIO
instance and liftIO :: MonadIO m => IO a -> Pipe i o u m a
). Remember, MonadIO m
is basically any m
that supports doing IO.
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L74-L119
sourceHandle :: MonadIO m => Handle -> Pipe i String m ()
= do
sourceHandle handle <- liftIO $ tryJust (guard . isEOFError) (hGetLine handle)
res case res of
Left _ -> return ()
Right out -> do
yield out
sourceHandle handle
sinkStdout :: MonadIO m => Pipe String o m ()
= do
sinkStdout <- await
inp case inp of
Nothing -> pure ()
Just x -> do
$ putStrLn x
liftIO
sinkStdout
toUpperPipe :: Monad m => Pipe String String m ()
= do
toUpperPipe <- await
inp case inp of
Nothing -> pure ()
Just x -> do
map toUpper x)
yield (
toUpperPipe
untilSTOP :: Monad m => Pipe String String m ()
= do
untilSTOP <- await
inp case inp of
Nothing -> pure ()
Just x
| x == "STOP" -> pure ()
| otherwise -> do
yield x untilSTOP
That’s because using FreeT
, we imbue the structure required to do monadic chaining (do notation) and MonadTrans (lift
) and MonadIO (liftIO
) for free!
To “run” our pipes, we can use FreeT
’s “interpreter” function. This follows the same pattern as for many free structures: specify how to handle each individual base functor constructor, and it then gives you a handler to handle the entire thing.
iterT :: (PipeF i o (m a) -> m a) -- ^ given a way to handle each base functor constructor ...
-> Pipe i o m a -> m a -- ^ here's a way to handle the whole thing
So let’s write our base functor handler. Remember that we established earlier we can only “run” a Pipe () Void m a
: that is, pipes where await
can always be fed with no information (()
) and no yield
is ever called (because you cannot yield with Void
, a type with no inhabitants). We can directly translate this to how we handle each constructor:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L57-L60
handlePipeF :: PipeF () Void (m a) -> m a
= \case
handlePipeF YieldF o _ -> absurd o
AwaitF f -> f (Just ())
And so we get our full runPipe
:
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L62-L63
runPipe :: Monad m => Pipe () Void m a -> m a
= iterT handlePipeF runPipe
I think this process exemplifies most of the major beats when working with free structures:
- Define the base functor
- Allow the free structure to imbue the proper structure over your base functor
- Write your interpreter to interpret the constructors of your base functor, and the free structure will give you a way to interpret the entire structure.
The Final Ornament
If you look at the list of all the things we wanted, we’re still missing one thing: pipe composition/input-output chaining. That’s because it isn’t a primitive operation (like yield or await), and it wasn’t given to us for free by our free structure (FreeT
, which gave us monadic composition and monad transformer ability). So with how we have currently written it, there isn’t any way of getting around writing (.|)
manually. So let’s roll up our sleeves and do the (admittedly minimal amount of) dirty work.
Let’s think about the semantics of our pipe chaining. We want to never do more work than we need to do, so we’ll be “pull-based”: for f .| g
, try running g
as much as possible until it awaits anything from f
. Only then do we try doing f
.
To implement this, we’re going to have to dig in a little bit to the implementation/structure of FreeT
:
newtype FreeT f m a = FreeT
runFreeT :: m (FreeF f a (FreeT f m a)) }
{
data FreeF f a b
Pure a
| Free (f b)
This does look a little complicated, and on the face of it, it can be a bit intimidating. And why is there a second internal data type?
Well, you can think of FreeF f a b
as being a fancy version of Either a (f b)
. And the implementation of FreeT
is saying that FreeT f m a
is an m-action that produces Either a (FreeT f m a)
. So for example, FreeT f IO a
is an IO action that produces either the a
(we’re done, end here!) or a f (FreeT f m a))
(we have to handle an f
here!)
newtype FreeT f m a = FreeT
runFreeT :: m (Either a (f (FreeT f m a))) } {
At the top level, FreeT
is an action in the underlying monad (just like MaybeT
, ExceptT
, StateT
, etc.). Let’s take that into account and write our implementation (with a hefty bit of help from the typechecker and typed holes)! Remember our plan: for f .| g
, start unrolling g
until it needs anything, and then ask f
when it does.
.|)
( :: Monad m
=> Pipe a b m x -- ^ pipe from a -> b
-> Pipe b c m y -- ^ pipe from b -> c
-> Pipe a c m y -- ^ pipe from a -> c
.| pg = do
pf <- lift $ runFreeT pg -- 1
gRes case gRes of
Pure x -> pure x -- 2
Free (YieldF o x) -> do -- 3
yield o.| x
pf Free (AwaitF g ) -> do -- 4
<- lift $ runFreeT pf
fRes case fRes of
Pure _ -> pure () .| g Nothing -- 5
Free (YieldF o y) -> y .| g (Just o) -- 6
Free (AwaitF f ) -> do -- 7
<- await
i .| FreeT (pure gRes) f i
Here are some numbered notes and comments:
- Start unrolling the downstream pipe
pg
, in the underlying monadm
! - If
pg
producedPure x
, it means we’re done pulling anything. The entire pipe has terminated, since we will never need anything again. So just quit out withpure x
. - If
pg
producedFree (YieldF o x)
, it means it’s yielding ano
and continuing on withx
. So let’s just yield thato
and move on to the composition ofpf
with the next pipex
. - If
pg
producedFree (AwaitF g)
, now things get interesting. We need to unrollpf
until it yields someMaybe b
, and feed that tog :: Maybe b -> Pipe b c m y
. - If
pf
producedPure y
, that means it was done! The upstream terminated, so the downstream will have to terminate as well. Sog
gets aNothing
, and we move from there. Note we have to compose with a dummy pipepure ()
to make the types match up properly. - If
pf
producedYieldF o y
, then we have found our match! So giveg (Just o)
, and now we recursively compose the next pipe (y
) with the thatg
gave us. - If
pf
producedAwaitF f
, then we’re in a bind, aren’t we? We now have two layers waiting for something further upstream. So, we await from even further upstream; when we get it, we feed it tof
and then composef i :: Pipe a b m x
withpg
’s result (wrapping upgRes
back into aFreeT
/Pipe
so the types match up).
Admittedly (!) this is the “ugly” part of this derivation: sometimes we just can’t get everything for free. But getting the Monad, Applicative, Functor, MonadTrans, etc. instances is probably nice enough to justify this inconvenience :) And who knows, there might be a free structure that I don’t know about that gives us all of these plus piping for free.
Christmas Miracle
It runs!
-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L128-L133
samplePipe :: Handle -> Pipe i o IO ()
=
samplePipe handle
sourceHandle handle.| untilSTOP
.| toUpperPipe
.| sinkStdout
$ cat testpipefile.txt
hello
world
STOP
okay
goodbye
> withFile "testpipefile.txt" ReadMode $ \handle ->
ghci
runPipe (samplePipe handle)-- HELLO
-- WORLD
Smooth as silk :D
Takeways for a Happy New Year
Most of this post was thought up when I needed2 a tool that was sort of like conduit, sort of like pipes, sort of like the other libraries…and I thought I had to read up on the theory of pipes and iteratees and trampolines and fancy pants math stuff to be able to make anything useful in this space. I remember being very discouraged when I read about this stuff as a wee new Haskeller, because the techniques seemed so foreign and out of the range of my normal Haskell experience.
However, I found a way to maintain a level head somehow, and just thought — “ok, I just need a monad (trans) with two primitive actions: await, and yield. Why don’t I just make an await and yield and get automatic Monad
and MonadTrans
instances with the appropriate free structure?”
As we can see…this works just fine! We only needed to implement one extra thing (.|
) to get the interface of our dreams. Of course, for a real industrial-strength streaming combinator library, we might need to be a bit more careful. But for my learning experience and use case, it worked perfectly.
The next time you need to make some monad that might seem exotic, try this out and see if it works for you :)
Happy holidays, and merry Christmas!
Exercises
Click on the links in the corner of the text boxes for solutions! (or just check out the source file)
An
Pipe i o m a
“takes”i
and “produces”o
, so it should make sense to make pre-map and post-map functions:-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L148-L151 postMap :: Monad m => (o -> o') -> Pipe i o m a -> Pipe i o' m a preMap :: Monad m => (i' -> i) -> Pipe i o m a -> Pipe i' o m a
That pre-maps all inputs the pipe would receive, and post-maps all of the values it yields.
Hint: This actually is made a lot simpler to write with the handy
transFreeT
combinator, which lets you swap out/change the base functor:transFreeT :: (forall a. f a -> g a) -- ^ polymorphic function to edit the base functor -> FreeT f m b -> FreeT g m b transFreeT :: (forall a. PipeF i o a -> PipeF i' o' a) -- ^ polymorphic function to edit the base functor -> Pipe i o m a -> Pipe i' o' m a
We could then write pre-map and post-map function on
PipeF
and translate them toPipe
usingtransFreeT
:-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L140-L152 postMapF :: (o -> o') -> PipeF i o a -> PipeF i o' a preMapF :: (i' -> i) -> PipeF i o a -> PipeF i' o a postMap :: Monad m => (o -> o') -> Pipe i o m a -> Pipe i o' m a = transFreeT (postMapF f) postMap f preMap :: Monad m => (i' -> i) -> Pipe i o m a -> Pipe i' o m a = transFreeT (preMapF f) preMap f
One staple of a streaming combinator system is giving you a disciplined way to handle resources allocations like file handlers and properly close them on completion. Our streaming combinator system has no inherent way of doing this within its structure, but we can take advantage of the resourcet package to handle it for us.
Basically, if we run our pipes over
ResourceT IO
instead of normalIO
, we get an extra actionallocate
:allocate :: IO a -- ^ get a handler -> (a -> IO ()) -- ^ close a handler -> ResourceT IO (ResourceKey, a) -- example ReadMode) hClose allocate (openFile fp :: ResourceT IO (ResourceKey, Handler)
We can use this in our pipe to open a handler from a filename, and rest assured that the file handler will be closed when we eventually
runResourceT :: ResourceT IO a -> IO a
our pipe.-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L155-L165 sourceFile :: MonadIO m => FilePath -> Pipe i String (ResourceT m) () samplePipe2 :: FilePath -> Pipe i o (ResourceT IO) () = samplePipe2 fp sourceFile fp.| untilSTOP .| toUpperPipe .| hoistFreeT lift sinkStdout
> runResourceT . runPipe $ samplePipe2 "testpipefile.txt" ghci-- HELLO -- WORLD
Let’s say we modified our
PipeF
slightly to take another parameteru
, the result type of the upstream pipe.data PipeF i o u a = YieldF o a | AwaitF (Either u i -> a) type Pipe i o u = FreeT (PipeF i o u) await :: Pipe i o m (Either u i) = liftF $ AwaitF id await
So now
await
would be fedi
things yielded from upstream, but sometimes you’d get aLeft
indicating that the upstream pipe has terminated.What would be the implications if
u
isVoid
?type CertainPipe i o = Pipe i o Void
What could you do in a
CertainPipe i o m a
that you couldn’t normally do with ourPipe i o m a
?We mentioned earlier that a “source” could have type
type Source = Pipe ()
And a
Source o m a
would be something that keeps on pumping outo
s as much as we need, without requiring any upstream input.This is actually the essential behavior of the (true) list monad transformer, as esposed by the list-transformer package.
In that package,
ListT
is defined as:newtype ListT m a = ListT { next :: m (Step m a) } data Step m a = Cons a (ListT m a) | Nil
And it’s a type that can yield out new
a
s on-demand, until exhausted.In fact,
Source o m ()
is equivalent toListT m o
. Write the functions to convert between them! :D-- source: https://github.com/mstksg/inCode/tree/master/code-samples/misc/streaming-combinators-free.hs#L171-L179 toListT :: Monad m => Pipe () o m a -> L.ListT m o fromListT :: Monad m => L.ListT m o -> Pipe i o m ()
Unfortunately we cannot use
iterT
because the last type parameter of each is different. But manual pattern matching (like how we wrote(.|)
) isn’t too bad!The semantics of
ListT
api is thatx <|> y
will “do” (and emit the result)x
before moving on to whaty
would emit. Andempty
is theListT
that signals it is done producing.<|>
andpure
andempty
forListT
are roughly analogous to>>
andyield
andreturn
forSource
, respectively.
Special Thanks
I am very humbled to be supported by an amazing community, who make it possible for me to devote time to researching and writing these posts. Very special thanks to my supporter at the “Amazing” level on patreon, Josh Vera! :)