Prelude – Purely functional parallelism – (Chapter 7)



Prelude – Purely functional parallelism – (Chapter 7)

0 1


talk-fp-in-scala-7-8

FP in Scala, Chapters 7 & 8 (presented at BFPG May 2016)

On Github mdjnewman / talk-fp-in-scala-7-8

FP in Scala

Chapters 7 & 8

Purely functional parallelism & Property-based testing

Created by Matt Newman for the April 2016 BFPG meetup

Prelude

Part 1: Fundamentals

  • What is functional programming?
  • Getting started with FP in Scala
  • Functional data structures
  • Handling errors without exceptions
  • Strictness and laziness
  • Purely functional state

Part 2: Functional design and combinator libraries

  • Purely functional parallelism
  • Property-based testing
  • Parser combinators

These chapters should be like peering over the shoulder of someone as they think through possible designs.

These chapters look at the impact the fundamentals in Part 1 have on library design, by way of three examples

Purely functional parallelism

(Chapter 7)

Why purely functional parallelism?

  • Modern computers have multiple cores/multiple CPUs
  • Shared mutable memory remains hard
    • Race conditions
    • Deadlocks
    • Testing is difficult
  • We want to be able to use the substitution model and reason about our code, by separating the concern of describing a computation from actually running it.

A motivating example

To start with, let's look at an example that we can refer back to in order to make things a little more concrete.

Summing a list of integers:

def sum(ints: Seq[Int]): Int =
    ints.foldLeft(0)((a,b) => a + b)

Same operation, using a divide-and-conquer algorithm:

def sum(ints: IndexedSeq[Int]): Int =
    if (ints.size <= 1)
        ints.headOption getOrElse 0
    else {
        val (l,r) = ints.splitAt(ints.length / 2)
        sum(l) + sum(r)
    }

(this implementation can be parallelised)

Summing integers is probably so fast that the overhead of parallelisation is probably greater than the improvement from running in parallel.

However, we don't care about the operation - we want to build a simple and composable set of core data types and functions for parallelism.

Choosing data types and functions

An initial data type for parallel computations

First, a container type for our results:

Par[A]

Second, a function to create a 'unit' of parallelism:

def unit[A](a: => A): Par[A]

Third, a method to extract the value from a parallel computation:

def get[A](a: Par[A]): A

For now, we don't need to worry about what other functions we require, or internal representation of Par.

Let's change our sum example to use this API:

def sum(ints: IndexedSeq[Int]): Int =
    if (ints.size <= 1)
        ints headOption getOrElse 0
    else {
        val (l,r) = ints.splitAt(ints.length/2)
        val sumL: Par[Int] = Par.unit(sum(l))
        val sumR: Par[Int] = Par.unit(sum(r))
        Par.get(sumL) + Par.get(sumR)
    }

Should unit start evaluating its argument immediately, or wait?

  • In this example, unit must start evaluating immediately to get any parallelism
    • Because if we wait until get is called while evaluating the first argument to our + function, we will spawn the computation and wait for it immediately, before starting the second computation

Is

val sumL: Par[Int] = Par.unit(sum(l))
val sumR: Par[Int] = Par.unit(sum(r))
Par.get(sumL) + Par.get(sumR)

equivalent to

Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))

?

If unit begins evaluating its argument concurrently, then calling get arguably breaks referential transparency, as our program is no longer parallel

... and we've lost.

It's becoming clear that we need to combine asynchronous computations without waiting for them to finish...

So if unit doesn't start evaluation of a parallel computation immediately, but we want to compute the sums of the two sublists in parallel, then we need some way to combine two parallel computations to create a third.

We have:

  • Conjoured up a simple example
  • Explored it to uncover a design choice
  • Experimented and learned something fundamental about our domain

Combining parallel computations

Imagine a function map2, defined as:

def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C]

We can use this in our example:

def sum(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.size <= 1)
        Par.unit(ints.headOption getOrElse 0)
    else {
        val (l,r) = ints.splitAt(ints.length/2)
        Par.map2(sum(l), sum(r))(_ + _)
    }

So we're now no longer calling unit or get in our recursive case, and map2 is free to start evaluating both its arguments in parallel.

Explicit forking

Do we always want to evaluate the two arguments to map2 in parallel?

What about in this simple case?

Par.map2(Par.unit(1), Par.unit(1))(_ + _)

With our current API, there is no way for the caller to say whether a computation should be forked onto a separate thread.

Introducing fork:

def fork[A](a: => Par[A]): Par[A]

Adding this to our sum example:

def sum(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.length <= 1)
        Par.unit(ints.headOption getOrElse 0)
    else {
        val (l,r) = ints.splitAt(ints.length/2)
        Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _)
    }

We've put the parallelism explicitly under programmer control, no arbitrary decision making here!

Let's introduce a new primitive for the caller to specify if a computation needs to be forked onto a separate thread:

This solves the problem of instantiating our parallel computations too strictly, but more fundamentally

Should unit be strict or lazy?

Thanks to fork, we can make it strict.

def unit[A](a: A): Par[A]
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

lazyUnit is a simple example of a contrived derived combinator, as opposed to a primitive combinator like unit.

Previously, unit accepted its argument lazily, but now that we have fork, unit can accept its argument strictly and we can use a derived combinator lazyUnit for cases when we don't want to fully evaluate a on the current thread.

This new combinator lazyUnit is called a derived combinator, as it is simply implemented in terms of other combinators.

We haven't actually defined unit anywhere yet - we will get to that later

Par is not a container that we can get a value from

Par is more like a description of a parallel computation

Just like we decided that unit should not have any side effects, it doesn't seem like a good idea to have fork evaluating values on a seperate thread as soon as it's called.

What we do instead is to think of fork as just 'marking' an unevaluated Par for concurrent evaluation.

Par is now a data structure describing a computation, not some kind of container that we can extract values from when required

To make it clear that a Par is basically a program that can be run, let's rename get:

def run[A](a: Par[A]): A

Par is now a pure data structure, and run has to have some means of implementing the parallelism...

To make it clear that a Par is describing a computation that can be run, we'll rename get to something more meaningful.

Our initial API has changed quite a bit, and that's a good thing!

We're making progress and learning about deficiencies in our initial design.

... (pause) ...

Now, we need to give run some means of implementing parallelism, but before that we need to work out how to represent a Par.

Choosing a representation

Let's review our updated API for Par:

// Create a computation that returns immediately
def unit[A](a: A): Par[A]

// Combine the results of two parallel computations
def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C]

// Mark a computation to be executed concurrently
def fork[A](a: => Par[A]): Par[A]

// Fully evaluate a given Par, spawning parallel computations
// as requested by fork
def run[A](a: Par[A]): A

Can run implement parallelism using a java.util.ExecutorService?

class ExecutorService {
    def submit[A](a: Callable[A]): Future[A]
}

trait Callable[A] { def call: A } // just a lazy A

trait Future[A] {
    def get: A
    def get(timeout: Long, unit: TimeUnit): A
    def cancel(evenIfRunning: Boolean): Boolean
    def isDone: Boolean
    def isCancelled: Boolean
}

Could run use a Java ExecutorService to implement parallelism?

This is the Java API transcribed to Scala.

For now, let's assume an ExecutorService is appropriate:

def run[A](s: ExecutorService)(a: Par[A]): A

Now, if we define Par[A] to be:

type Par[A] = ExecutorService => Future[A]

Then run becomes trivial to implement:

def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

By having Par[A] defined this way, we give the caller the ability to choose how long to wait for a computation or whether to cancel it etc.

Par is a function that needs an ExecutorService, so the creation of the Future doesn't happen until it's provided.

Refining our API

Before we move on, let's look at what is possible with the API we have so far.

object Par {
  type Par[A] = ExecutorService => Future[A]

  def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

  def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)

  // This implementation of `map2` does _not_ respect timeouts
  def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] =
    (es: ExecutorService) => {
      val af = a(es)
      val bf = b(es)
      UnitFuture(f(af.get, bf.get))
    }

  // ...
  • UnitFuture just wraps a constant value
  • map2 doesn't evaluate f in a separate logical thread, caller can wrap it in fork if they want that
  • To support timeouts in map2, we'd need a new implementation of Future that keeps track of the amount of time spend evaluating af
  def map[A,B](pa: Par[A])(f: A => B): Par[B] =
    map2(pa, unit(()))((a,_) => f(a))

  def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted)

  def fork[A](a: => Par[A]): Par[A] =
    es => es.submit(new Callable[A] {
      def call = a(es).get
    })

  // ...
  • map can lift any function with type A => B to operate over a Par
  • This implementation of fork is simple, but has some problems
    • The callable we pass to es.submit will block waiting for the get to complete - so we're using two threads
def sequence[A](ps: List[Par[A]]): Par[List[A]] =
  sys.error("Do your homework!")

def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
  val fbs: List[Par[B]] = ps.map(asyncF(f))
  sequence(fbs)
}

We're wrapping the body of parMap in a fork - it will always return immediately, and when we call run later it will spawn a computation which itself spawns N computations

Laws for our API

Time to step back from blindly following the types and formalize the laws that we expect to hold

The law of mapping

A concrete example:

map(unit(1))(_ + 1) == unit(2)

or more generally:

map(unit(x))(f) == unit(f(x))

and when f is id:

map(unit(x))(id) == unit(id(x))
map(unit(x))(id) == unit(x)
map(   y   )(id) == y

Here we are saying this should hold for any choice of f and x, which places some useful constraints:

  • Our implementation of unit can't inspect the provided values and behave differently depending on input
    • So no downcasting or isInstanceOf checks

Given these constraints, we can reason about what happens when f is id, which leads us to a simpler law that doesn't mention unit at all.

Laws like these are very handy when reasoning about your code and when testing it using property based testing, as we will see in the next chapter.

Mention parametricity?

The law of forking

fork should not affect the result of a parallel computation:

fork(x) == x

This places strong constraints on our implementation.

We’re expecting that fork(x) == x for all choices of x, and any choice of ExecutorService.

Some ExecutorServices are backed by a fixed size thread pool...

A subtle bug

val a = lazyUnit(42 + 1)
val S = Executors.newFixedThreadPool(1)
println(Par.equal(S)(a, fork(a)))

Remember earlier, the issue with our implementation of fork?

We’re submitting the Callable first, and within that Callable, we’re submitting another Callable to the ExecutorService and blocking on its result

There is no way to fix this - we need a different representation for Par.

Fully non-blocking implementation using Actors

Homework time!

This is a really good example and shows how you can use side effects as an implementation detail and still have a purely functional API.

Refining combinators to their most general form

Before we finish up, I want to talk about ...

Functional design is an iterative process, when using an API you might find some new combinator that you need.

Sometimes this combinator is just a special form of a more general combinator.

Choosing between computations

Given a computation, use it to choose another:

def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A]

We can make this more general by allowing n alternatives:

def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A]

But why require a list of alternatives?

def chooser[A,B](pa: Par[A])(choices: A => Par[B]): Par[B]

Why restrict the caller to just two alternatives?

The type signature of chooser may look familiar to functions we've seen in the earlier sections...

def chooser[A,B](pa: Par[A])(choices: A => Par[B]): Par[B]
def flatMap[A,B](fa:   F[A])(   f   : A =>   F[B]):   F[B]

As we have seen in most chapters, if we abstract enough we end up with flatmap and sequence and friends. We've seen flatMap specialised for lists and random number generators, but flatMap is a very general function that we will look into in more detail later.

We could try to take this process of generalistion further, but by now you probably get the point - before implementing a specific combinator, look to see if it's actually a specific case of a more general combinator that could be reused elsewhere.

Summary

We have:

  • Completed the design of a library for defining parallel and asynchronous computations in a purely functional way
  • Explored the process of functional design to gain a sense of the challenges and solutions
  • Explored what it means for an API to form an algebra

Further reading if you're interested in this domain:

i.e. we've defined a collection of data types and functions and laws/properties that express relationships between these functions.

Property based testing

(Chapter 8)

A brief tour of property-based testing

Defining a property:

val intList = Gen.listOf(Gen.choose(0,100))

val prop =
    forAll(intList)(ns => ns.reverse.reverse == ns) &&
    forAll(intList)(ns => ns.headOption == ns.reverse.lastOption)

Checking a property:

scala> prop.check
+ OK, passed 100 tests.

Is everyone here familiar with the concept of property based testing?

The idea of property based testing is to decouple the specification of program behavior from the creation of test cases. The programmer focuses on specifying the behavior of programs and giving high-level constraints on the test cases.

  • These are examples from ScalaCheck, where we're checking the reverse function on a List data type
  • Here intList is a generator of list of integers
  • forAll creates a property by combining a generator with a predicate
  • In this case, our property is specifying that for every list generated by intList, reversing that list twice should yield the initial list
  • A property is a specification of the program, or an invariant
  • We can then check our property, using its check method. In the case of failure, ScalaCheck will also minimise test cases to the smallest failing case

Choosing data types and functions

Initial snippets of an API

A type for a generator for values of type A:

Gen[A]

A function to create a generator for lists of values of type A:

def listOfN[A](n: Int, a: Gen[A]): Gen[List[A]]

A function to create a property:

def forAll[A](a: Gen[A])(f: A => Boolean): Prop

What we have here is a rough draft of our API - it's not complete, and as we've seen in the previous chapter it will change as we work toward our final solution.

We'll start by reading off some types and functions from the ScalaCheck example

  • We'll need a type for a generator of values of type A - we make this generic, so that we don't need a separate generator type for every time we are testing code with different input types

  • As long as listOfN is told how to generate values to put in the list, it doesn't need to care about the type of elements in the list, so we can make it polymorphic

  • We then have forAll, which given a generator and a predicate returns a Property, as in the ScalaCheck example

  • What do we want to do with properties?

API of properties

So far our API consists of forAll, && and check.

Simplest respresentation:

trait Prop { def check: Boolean }

Better representation:

object Prop {
    type FailedCase = String
    type SuccessCount = Int
}

trait Prop { def check: Either[FailedCase,SuccessCount] }
  • So we can create Props, combine them and check them
  • Loosing information around what caused the failure and how many successes we had
  • It's okay for FailedCase to be a string, as all we're doing with it is printing it out for the user
  • We haven't considered the arguments to check yet - let's take a look at generators first

API of generators

What if we just wrap a state transition and randomly generate values?

case class Gen[A](sample: State[RNG,A])

We've seen State in earlier chapters, but here's the definition again:

case class State[S,A](run: S => (A,S))

At this stage, all we want to do is randomly generate values, so we could just make Gen a type that wraps a State transition over a random number generator, which gets us a long way toward our desired behaviour.

Generators that depend on generated values

How could we make a Gen[(String,String)] where the second string contains only characters from the first?

Remember chooser?

def chooser[A,B](pa: Par[A])(choices: A => Par[B]): Par[B]
def flatMap[A,B](ga: Gen[A])(   f   : A => Gen[B]): Gen[B]

This is a similar problem to one we've see before, when we wanted to choose a parallel computation to run based on the result of an earlier computation.

We can already express some more complicated generators using combinators we're familiar with.

This symmetry between chooser and flatMap is no coincidence, but we will see that in more detail later in the book.

Refining the Prop data type

We can't just continue checking a Prop indefinitely - we need to be told when to stop!

type TestCases = Int
type Result = Option[(FailedCase, SuccessCount)]
case class Prop(run: TestCases => Result)

Taking a look at properties again ...

With what we've written so far, there is no way for the framework to know when to stop generating inputs and evaluating the predicate - rather than making an assumption of always running, say, 10 test cases, we can make this explicit in the API and leave it up to the programmer.

We can always add helper methods with sensible defaults later, but we'll need to update our Prop type to support this.

As we are now telling the Prop how many cases to run, there is no point having run return a success count if the property passes (as they will be equal for a passing property), so we we can go from a Either to an Option.

We're using an Option for a result, where None is success, which is a little confusing. The book goes on to add a more explicit Result type with a clearer meaning.

Prop.run will also require a RNG to sample from the Gen, so we'll add that:

case class Prop(run: (TestCases,RNG) => Result)

So we've given Prop a limit on the number of test cases to run ...

What else does Prop require to generate values and test them?

...

So our run function now takes two arguments.

def forAll[A](as: Gen[A])(f: A => Boolean): Prop = Prop {
  (n,rng) => randomStream(as)(rng).zip(Stream.from(0)).take(n).map {
    case (a, i) => try {
      if (f(a)) Passed else Falsified(a.toString, i)
    } catch { case e: Exception => Falsified(buildMsg(a, e), i) }
  }.find(_.isFalsified).getOrElse(Passed)
}

We now know enough to implement forAll, which, as we've seen, is how we build properties given a generator and a predicate

Remember that streams are lazily evaluated, so we won't check every input if the first one fails.

We catch exceptions so that we can report which input caused them.

Sized generation

When we're randomly generating test cases, it might happen that we generate some ridiculously complicated input as our first example and if falsifies our property, leaving us with a difficult debugging task.

For example, say you have some function over lists. If your program is broken for all lists, then it would be better that you run a test case for a list with a single element in it first, rather than having a 1000 element long list splattered on the console.

Earlier, I mentioned briefly that ScalaCheck solves this problem by using test case minimisation - ScalaCheck will iteratively find the smallest test case that fails.

We're going to use sized generation to get a similar outcome.

This means we will generate inputs of a small size first, and increase once the predicate has been checked for multiple cases of the small size.

For lists, the size is probably the length of the list. For a tree, it might be the height, or the number of nodes.

A type for sized generation

case class SGen[+A](forSize: Int => Gen[A])
  • This keeps our existing Gen type untouched
  • A Gen can be trivially converted into an SGen by ignoring the size requirement
  • Many operations on SGen can just delegate to Gen
scala> import fpinscala.testing._
import fpinscala.testing._

scala> val sgen = SGen(n => Gen.choose(0,10).listOfN(n))
sgen: fpinscala.testing.SGen[List[Int]] = SGen(<function1>)

scala> val gen = sgen(1)
gen: fpinscala.testing.Gen[List[Int]]
        = Gen(State(<function1>))

scala> gen.sample.run(fpinscala.state.RNG.Simple(11))
res8: (List[Int], fpinscala.state.RNG)
        = (List(7),Simple(277363943098))

To see what it looks like using a sized generator, we can manually sample values from it on the Scala console.

def forAll[A](g: SGen[A])(f: A => Boolean): Prop =
  sys.error("Nope!")

forAll now needs to change as well, as SGen needs to be told a size.

Our new Prop:

case class Prop(run: (MaxSize,TestCases,RNG) => Result)

...

Currently our property is responsible for invoking the generator, so we can keep using this pattern by adding a maximum size to the Prop.

This puts the size under the control of the user of our library.

def forAll[A](g: SGen[A])(f: A => Boolean): Prop = Prop {
  (max,n,rng) =>
    val casesPerSize = (n - 1) / max + 1
    val props: Stream[Prop] =
      Stream.from(0).take((n min max) + 1).map(i => forAll(g(i))(f))
    val prop: Prop =
      props.map(p => Prop { (max, _, rng) =>
        p.run(max, casesPerSize, rng)
      }).toList.reduce(_ && _)
    prop.run(max,n,rng)
}

With this new representation for Prop, we can reimplement forAll with a simple algorithm:

  • First, we work out how many test cases we need to run for each size. If n = 100, and maxSize = 10, then we'll run 10 cases for each size.

  • Next, we make a stream of properties - one for each size, but no more than n

  • Now we have a stream of properties, which we can combine using our and operator

  • Finally, we run this new Prop and return the result

This is a simple implementation, and is just one possible algorithm we could use.

Writing a test suite for parallel computations

Homework time!

The book has a section on using property based testing to validate the laws we covered in the previous chapter.

I definitely recommend working through the exercises in this section, so that you see how to use our property based testing library in a non-trivial domain.

The laws of generators

Remember map for Par?:

def map[A,B](a: Par[A])(f: A => B): Par[B]

Compare this with map for Gen:

def map[A,B](a: Gen[A])(f: A => B): Gen[B]

The same law even holds!

map(x)(id) == x

Not only similar signatures, these functions also have similar meanings in their respective domains

In part three of the text, we'll learn the names of these patterns and laws that govern them.

Summary

  • We've learnt more about functional design by creating a library for property based testing
  • Oscillating between the abstract algebra and the concrete representation lets the two inform each other
  • We needed many of the same combinators! (map, flatMap etc)

There are a great many seemingly distinct problems being solved in the world of software, yet the space of functional solutions is much smaller.

  • Iterating in this was means we avoid overfitting the library to a particular representation, but keeps us grounded

FIN

Talk and slides by Matt Newman (@mdjnewman)

Buy the book and look at the exercises and source code

Slides: http://mdjnewman.github.io/talk-fp-in-scala-7-8/

Also, check out @SICPQuotes

FP in Scala Chapters 7 & 8 Purely functional parallelism & Property-based testing Created by Matt Newman for the April 2016 BFPG meetup