On Github anirothan / StreamsPresentation
A lightweight F#/C# library for efficient functional-style pipelines on streams of data.
Make functional data query pipelines FAST
An automatic query optimizer-compiler for Sequential and Parallel LINQ.
The query
var query = (from num in nums.AsQueryExpr() where num % 2 == 0 select num * num).Sum();
compiles to
int sum = 0; for (int index = 0; index < nums.Length; index++) { int num = nums[index]; if (num % 2 == 0) sum += num * num; }
ICOOOLPS'14
1:
source |> inter |> inter |> inter |> terminal
1: 2: 3: 4: 5:
let data = [| 1..10000000 |] |> Array.map int64 data |> Seq.filter (fun i -> i % 2L = 0L) //lazy |> Seq.map (fun i -> i + 1L) //lazy |> Seq.sum //eager, forcing evaluation
1: 2: 3: 4: 5:
let data = [| 1..10000000 |] |> Array.map int64 data |> Seq.filter (fun i -> i % 2L = 0L) //lazy inter |> Seq.map (fun i -> i + 1L) //lazy inter |> Seq.sum //eager terminal, forcing evaluation
The terminal is pulling data from the pipeline via IEnumerator.Current and IEnumerator.MoveNext()
1: 2: 3: 4: 5:
let data = [| 1..10000000 |] |> Array.map int64 Stream.ofArray data //source |> Stream.filter (fun i -> i % 2L = 0L) //lazy |> Stream.map (fun i -> i + 1L) //lazy |> Stream.sum //eager, forcing evaluation
1: 2: 3: 4:
Stream.ofArray data //source |> Stream.filter (fun i -> i % 2L = 0L) //lazy |> Stream.map (fun i -> i + 1L) //lazy |> Stream.sum //eager, forcing evaluation
The source is pushing data down the pipeline.
1:
Seq.iter : ('T -> unit) -> seq<'T> -> unit
1:
seq<'T> -> ('T -> unit) -> unit
1:
type Stream<'T> = ('T -> unit) -> unit
1:
type Stream = ('T -> unit) -> unit
Can do map, filter, fold, iter
1:
type Stream = ('T -> unit) -> unit
Stopping push required for e.g.
1:
Stream.takeWhile : ('T -> bool) -> Stream<'T> -> Stream<'T>
Change
1:
type Stream = ('T -> unit) -> unit
to
1:
type Stream = ('T -> bool) -> unit
1:
Stream.zip : Stream<'T> -> Stream<'S> -> Stream<'T * 'S>
Zip needs to synchronise the flow of values.
Zip needs to pull!
1: 2: 3: 4: 5: 6: 7:
// ('T -> bool) is the composed continutation with 'T for the current value // and bool is a flag for early termination // (unit -> unit) is a function for bulk processing // (unit -> bool) is a function for on-demand processing /// Represents a Stream of values. type Stream<'T> = Stream of (('T -> bool) -> (unit -> unit) * (unit -> bool))
Implements a rich set of operations
1: 2: 3: 4: 5: 6:
let data = [| 1..10000000 |] |> Array.map int64 data |> ParStream.ofArray |> ParStream.filter (fun x -> x % 2L = 0L) |> ParStream.map (fun x -> x + 1L) |> ParStream.sum
Example: a word count
We can write functional pipelines with the performance of imperative code.
Stream fusion: from lists to streams to nothing at all, Duncan Coutts, Roman Leshchinskiy, and Don Stewart, ICFP '07
Depends on the compiler's ability to inline.
In principal, can be always fused (in-lined).
Not always done by F# compiler.
by @biboudis
https://github.com/biboudis/sml-streams
MLton appears to always be fusing.