On Github mdjnewman / talk-fp-in-scala-7-8
Purely functional parallelism & Property-based testing
Created by Matt Newman for the April 2016 BFPG meetup
These chapters should be like peering over the shoulder of someone as they think through possible designs.
These chapters look at the impact the fundamentals in Part 1 have on library design, by way of three examples
To start with, let's look at an example that we can refer back to in order to make things a little more concrete.
Summing a list of integers:
def sum(ints: Seq[Int]): Int = ints.foldLeft(0)((a,b) => a + b)
Same operation, using a divide-and-conquer algorithm:
def sum(ints: IndexedSeq[Int]): Int = if (ints.size <= 1) ints.headOption getOrElse 0 else { val (l,r) = ints.splitAt(ints.length / 2) sum(l) + sum(r) }
(this implementation can be parallelised)
Summing integers is probably so fast that the overhead of parallelisation is probably greater than the improvement from running in parallel.
However, we don't care about the operation - we want to build a simple and composable set of core data types and functions for parallelism.
First, a container type for our results:
Par[A]
Second, a function to create a 'unit' of parallelism:
def unit[A](a: => A): Par[A]
Third, a method to extract the value from a parallel computation:
def get[A](a: Par[A]): A
For now, we don't need to worry about what other functions we require, or internal representation of Par.
Let's change our sum example to use this API:
def sum(ints: IndexedSeq[Int]): Int = if (ints.size <= 1) ints headOption getOrElse 0 else { val (l,r) = ints.splitAt(ints.length/2) val sumL: Par[Int] = Par.unit(sum(l)) val sumR: Par[Int] = Par.unit(sum(r)) Par.get(sumL) + Par.get(sumR) }
Should unit start evaluating its argument immediately, or wait?
Is
val sumL: Par[Int] = Par.unit(sum(l)) val sumR: Par[Int] = Par.unit(sum(r)) Par.get(sumL) + Par.get(sumR)
equivalent to
Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))
?
If unit begins evaluating its argument concurrently, then calling get arguably breaks referential transparency, as our program is no longer parallel
... and we've lost.
It's becoming clear that we need to combine asynchronous computations without waiting for them to finish...
So if unit doesn't start evaluation of a parallel computation immediately, but we want to compute the sums of the two sublists in parallel, then we need some way to combine two parallel computations to create a third.
We have:
Imagine a function map2, defined as:
def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C]
We can use this in our example:
def sum(ints: IndexedSeq[Int]): Par[Int] = if (ints.size <= 1) Par.unit(ints.headOption getOrElse 0) else { val (l,r) = ints.splitAt(ints.length/2) Par.map2(sum(l), sum(r))(_ + _) }
So we're now no longer calling unit or get in our recursive case, and map2 is free to start evaluating both its arguments in parallel.
Do we always want to evaluate the two arguments to map2 in parallel?
What about in this simple case?
Par.map2(Par.unit(1), Par.unit(1))(_ + _)
With our current API, there is no way for the caller to say whether a computation should be forked onto a separate thread.
Introducing fork:
def fork[A](a: => Par[A]): Par[A]
Adding this to our sum example:
def sum(ints: IndexedSeq[Int]): Par[Int] = if (ints.length <= 1) Par.unit(ints.headOption getOrElse 0) else { val (l,r) = ints.splitAt(ints.length/2) Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _) }
We've put the parallelism explicitly under programmer control, no arbitrary decision making here!
Let's introduce a new primitive for the caller to specify if a computation needs to be forked onto a separate thread:
This solves the problem of instantiating our parallel computations too strictly, but more fundamentally
Should unit be strict or lazy?
Thanks to fork, we can make it strict.
def unit[A](a: A): Par[A] def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
lazyUnit is a simple example of a contrived derived combinator, as opposed to a primitive combinator like unit.
Previously, unit accepted its argument lazily, but now that we have fork, unit can accept its argument strictly and we can use a derived combinator lazyUnit for cases when we don't want to fully evaluate a on the current thread.
This new combinator lazyUnit is called a derived combinator, as it is simply implemented in terms of other combinators.
We haven't actually defined unit anywhere yet - we will get to that later
Par is not a container that we can get a value from
Par is more like a description of a parallel computation
Just like we decided that unit should not have any side effects, it doesn't seem like a good idea to have fork evaluating values on a seperate thread as soon as it's called.
What we do instead is to think of fork as just 'marking' an unevaluated Par for concurrent evaluation.
Par is now a data structure describing a computation, not some kind of container that we can extract values from when required
To make it clear that a Par is basically a program that can be run, let's rename get:
def run[A](a: Par[A]): A
Par is now a pure data structure, and run has to have some means of implementing the parallelism...
To make it clear that a Par is describing a computation that can be run, we'll rename get to something more meaningful.
Our initial API has changed quite a bit, and that's a good thing!
We're making progress and learning about deficiencies in our initial design.
... (pause) ...
Now, we need to give run some means of implementing parallelism, but before that we need to work out how to represent a Par.
Let's review our updated API for Par:
// Create a computation that returns immediately def unit[A](a: A): Par[A] // Combine the results of two parallel computations def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] // Mark a computation to be executed concurrently def fork[A](a: => Par[A]): Par[A] // Fully evaluate a given Par, spawning parallel computations // as requested by fork def run[A](a: Par[A]): A
Can run implement parallelism using a java.util.ExecutorService?
class ExecutorService { def submit[A](a: Callable[A]): Future[A] } trait Callable[A] { def call: A } // just a lazy A trait Future[A] { def get: A def get(timeout: Long, unit: TimeUnit): A def cancel(evenIfRunning: Boolean): Boolean def isDone: Boolean def isCancelled: Boolean }
Could run use a Java ExecutorService to implement parallelism?
This is the Java API transcribed to Scala.
For now, let's assume an ExecutorService is appropriate:
def run[A](s: ExecutorService)(a: Par[A]): A
Now, if we define Par[A] to be:
type Par[A] = ExecutorService => Future[A]
Then run becomes trivial to implement:
def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
By having Par[A] defined this way, we give the caller the ability to choose how long to wait for a computation or whether to cancel it etc.
Par is a function that needs an ExecutorService, so the creation of the Future doesn't happen until it's provided.
Before we move on, let's look at what is possible with the API we have so far.
object Par { type Par[A] = ExecutorService => Future[A] def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s) def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // This implementation of `map2` does _not_ respect timeouts def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = (es: ExecutorService) => { val af = a(es) val bf = b(es) UnitFuture(f(af.get, bf.get)) } // ...
def map[A,B](pa: Par[A])(f: A => B): Par[B] = map2(pa, unit(()))((a,_) => f(a)) def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted) def fork[A](a: => Par[A]): Par[A] = es => es.submit(new Callable[A] { def call = a(es).get }) // ...
def sequence[A](ps: List[Par[A]]): Par[List[A]] = sys.error("Do your homework!") def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork { val fbs: List[Par[B]] = ps.map(asyncF(f)) sequence(fbs) }
We're wrapping the body of parMap in a fork - it will always return immediately, and when we call run later it will spawn a computation which itself spawns N computations
Time to step back from blindly following the types and formalize the laws that we expect to hold
A concrete example:
map(unit(1))(_ + 1) == unit(2)
or more generally:
map(unit(x))(f) == unit(f(x))
and when f is id:
map(unit(x))(id) == unit(id(x)) map(unit(x))(id) == unit(x) map( y )(id) == y
Here we are saying this should hold for any choice of f and x, which places some useful constraints:
Given these constraints, we can reason about what happens when f is id, which leads us to a simpler law that doesn't mention unit at all.
Laws like these are very handy when reasoning about your code and when testing it using property based testing, as we will see in the next chapter.
Mention parametricity?
fork should not affect the result of a parallel computation:
fork(x) == x
This places strong constraints on our implementation.
We’re expecting that fork(x) == x for all choices of x, and any choice of ExecutorService.
Some ExecutorServices are backed by a fixed size thread pool...
val a = lazyUnit(42 + 1) val S = Executors.newFixedThreadPool(1) println(Par.equal(S)(a, fork(a)))
Remember earlier, the issue with our implementation of fork?
We’re submitting the Callable first, and within that Callable, we’re submitting another Callable to the ExecutorService and blocking on its result
There is no way to fix this - we need a different representation for Par.
Homework time!
This is a really good example and shows how you can use side effects as an implementation detail and still have a purely functional API.Before we finish up, I want to talk about ...
Functional design is an iterative process, when using an API you might find some new combinator that you need.
Sometimes this combinator is just a special form of a more general combinator.
Given a computation, use it to choose another:
def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A]
We can make this more general by allowing n alternatives:
def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A]
But why require a list of alternatives?
def chooser[A,B](pa: Par[A])(choices: A => Par[B]): Par[B]
Why restrict the caller to just two alternatives?
The type signature of chooser may look familiar to functions we've seen in the earlier sections...
def chooser[A,B](pa: Par[A])(choices: A => Par[B]): Par[B] def flatMap[A,B](fa: F[A])( f : A => F[B]): F[B]
As we have seen in most chapters, if we abstract enough we end up with flatmap and sequence and friends. We've seen flatMap specialised for lists and random number generators, but flatMap is a very general function that we will look into in more detail later.
We could try to take this process of generalistion further, but by now you probably get the point - before implementing a specific combinator, look to see if it's actually a specific case of a more general combinator that could be reused elsewhere.
We have:
Further reading if you're interested in this domain:
i.e. we've defined a collection of data types and functions and laws/properties that express relationships between these functions.
Defining a property:
val intList = Gen.listOf(Gen.choose(0,100)) val prop = forAll(intList)(ns => ns.reverse.reverse == ns) && forAll(intList)(ns => ns.headOption == ns.reverse.lastOption)
Checking a property:
scala> prop.check + OK, passed 100 tests.
Is everyone here familiar with the concept of property based testing?
The idea of property based testing is to decouple the specification of program behavior from the creation of test cases. The programmer focuses on specifying the behavior of programs and giving high-level constraints on the test cases.
A type for a generator for values of type A:
Gen[A]
A function to create a generator for lists of values of type A:
def listOfN[A](n: Int, a: Gen[A]): Gen[List[A]]
A function to create a property:
def forAll[A](a: Gen[A])(f: A => Boolean): Prop
What we have here is a rough draft of our API - it's not complete, and as we've seen in the previous chapter it will change as we work toward our final solution.
We'll start by reading off some types and functions from the ScalaCheck example
We'll need a type for a generator of values of type A - we make this generic, so that we don't need a separate generator type for every time we are testing code with different input types
As long as listOfN is told how to generate values to put in the list, it doesn't need to care about the type of elements in the list, so we can make it polymorphic
We then have forAll, which given a generator and a predicate returns a Property, as in the ScalaCheck example
What do we want to do with properties?
So far our API consists of forAll, && and check.
Simplest respresentation:
trait Prop { def check: Boolean }
Better representation:
object Prop { type FailedCase = String type SuccessCount = Int } trait Prop { def check: Either[FailedCase,SuccessCount] }
What if we just wrap a state transition and randomly generate values?
case class Gen[A](sample: State[RNG,A])
We've seen State in earlier chapters, but here's the definition again:
case class State[S,A](run: S => (A,S))
At this stage, all we want to do is randomly generate values, so we could just make Gen a type that wraps a State transition over a random number generator, which gets us a long way toward our desired behaviour.
How could we make a Gen[(String,String)] where the second string contains only characters from the first?
Remember chooser?
def chooser[A,B](pa: Par[A])(choices: A => Par[B]): Par[B]
def flatMap[A,B](ga: Gen[A])( f : A => Gen[B]): Gen[B]
This is a similar problem to one we've see before, when we wanted to choose a parallel computation to run based on the result of an earlier computation.
We can already express some more complicated generators using combinators we're familiar with.
This symmetry between chooser and flatMap is no coincidence, but we will see that in more detail later in the book.
We can't just continue checking a Prop indefinitely - we need to be told when to stop!
type TestCases = Int type Result = Option[(FailedCase, SuccessCount)] case class Prop(run: TestCases => Result)
Taking a look at properties again ...
With what we've written so far, there is no way for the framework to know when to stop generating inputs and evaluating the predicate - rather than making an assumption of always running, say, 10 test cases, we can make this explicit in the API and leave it up to the programmer.
We can always add helper methods with sensible defaults later, but we'll need to update our Prop type to support this.
As we are now telling the Prop how many cases to run, there is no point having run return a success count if the property passes (as they will be equal for a passing property), so we we can go from a Either to an Option.
We're using an Option for a result, where None is success, which is a little confusing. The book goes on to add a more explicit Result type with a clearer meaning.
Prop.run will also require a RNG to sample from the Gen, so we'll add that:
case class Prop(run: (TestCases,RNG) => Result)
So we've given Prop a limit on the number of test cases to run ...
What else does Prop require to generate values and test them?
...
So our run function now takes two arguments.
def forAll[A](as: Gen[A])(f: A => Boolean): Prop = Prop { (n,rng) => randomStream(as)(rng).zip(Stream.from(0)).take(n).map { case (a, i) => try { if (f(a)) Passed else Falsified(a.toString, i) } catch { case e: Exception => Falsified(buildMsg(a, e), i) } }.find(_.isFalsified).getOrElse(Passed) }
We now know enough to implement forAll, which, as we've seen, is how we build properties given a generator and a predicate
Remember that streams are lazily evaluated, so we won't check every input if the first one fails.
We catch exceptions so that we can report which input caused them.
When we're randomly generating test cases, it might happen that we generate some ridiculously complicated input as our first example and if falsifies our property, leaving us with a difficult debugging task.
For example, say you have some function over lists. If your program is broken for all lists, then it would be better that you run a test case for a list with a single element in it first, rather than having a 1000 element long list splattered on the console.
Earlier, I mentioned briefly that ScalaCheck solves this problem by using test case minimisation - ScalaCheck will iteratively find the smallest test case that fails.
We're going to use sized generation to get a similar outcome.
This means we will generate inputs of a small size first, and increase once the predicate has been checked for multiple cases of the small size.
For lists, the size is probably the length of the list. For a tree, it might be the height, or the number of nodes.
case class SGen[+A](forSize: Int => Gen[A])
scala> import fpinscala.testing._ import fpinscala.testing._ scala> val sgen = SGen(n => Gen.choose(0,10).listOfN(n)) sgen: fpinscala.testing.SGen[List[Int]] = SGen(<function1>) scala> val gen = sgen(1) gen: fpinscala.testing.Gen[List[Int]] = Gen(State(<function1>)) scala> gen.sample.run(fpinscala.state.RNG.Simple(11)) res8: (List[Int], fpinscala.state.RNG) = (List(7),Simple(277363943098))
To see what it looks like using a sized generator, we can manually sample values from it on the Scala console.
def forAll[A](g: SGen[A])(f: A => Boolean): Prop = sys.error("Nope!")
forAll now needs to change as well, as SGen needs to be told a size.
case class Prop(run: (MaxSize,TestCases,RNG) => Result)
...
Currently our property is responsible for invoking the generator, so we can keep using this pattern by adding a maximum size to the Prop.
This puts the size under the control of the user of our library.
def forAll[A](g: SGen[A])(f: A => Boolean): Prop = Prop { (max,n,rng) => val casesPerSize = (n - 1) / max + 1 val props: Stream[Prop] = Stream.from(0).take((n min max) + 1).map(i => forAll(g(i))(f)) val prop: Prop = props.map(p => Prop { (max, _, rng) => p.run(max, casesPerSize, rng) }).toList.reduce(_ && _) prop.run(max,n,rng) }
With this new representation for Prop, we can reimplement forAll with a simple algorithm:
First, we work out how many test cases we need to run for each size. If n = 100, and maxSize = 10, then we'll run 10 cases for each size.
Next, we make a stream of properties - one for each size, but no more than n
Now we have a stream of properties, which we can combine using our and operator
Finally, we run this new Prop and return the result
This is a simple implementation, and is just one possible algorithm we could use.
Homework time!
The book has a section on using property based testing to validate the laws we covered in the previous chapter.
I definitely recommend working through the exercises in this section, so that you see how to use our property based testing library in a non-trivial domain.
Remember map for Par?:
def map[A,B](a: Par[A])(f: A => B): Par[B]
Compare this with map for Gen:
def map[A,B](a: Gen[A])(f: A => B): Gen[B]
The same law even holds!
map(x)(id) == x
Not only similar signatures, these functions also have similar meanings in their respective domains
In part three of the text, we'll learn the names of these patterns and laws that govern them.
There are a great many seemingly distinct problems being solved in the world of software, yet the space of functional solutions is much smaller.
Talk and slides by Matt Newman (@mdjnewman)
Buy the book and look at the exercises and source code
Slides: http://mdjnewman.github.io/talk-fp-in-scala-7-8/
Also, check out @SICPQuotes