through the lens of OCaml/F#
ICOOOLPS'14 arxiv
1:
source |> inter |> inter |> inter |> terminal
1: 2: 3: 4: 5: 6: 7:
// let (|>) a f = f a let data = [| 1..10000000 |] data |> Stream.ofArray |> Stream.filter (fun i -> i % 2 = 0) |> Stream.map (fun i -> i * i) |> Stream.sum
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12:
type Stream<'T> = unit -> (unit -> 'T) val ofArray : 'T[] -> Stream<'T> let ofArray values = fun () -> let index = ref -1 (fun () -> incr index if !index < Array.length values then f values.[!index]) else raise StreamEnd)
1: 2: 3: 4: 5:
val map : ('T -> 'R) -> Stream<'T> -> Stream<'R> let map f stream = fun () -> let next = stream () fun () -> f (next ())
1: 2: 3: 4: 5: 6: 7:
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T> let filter p stream = let rec loop v next = if p v then v else loop (next ()) next fun () -> let next = stream () fun () -> loop (next ()) next
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15:
val iter : ('T -> unit) -> Stream<'T> -> unit let iter f stream let rec loop v next = f v; loop (next ()) next let next = stream () try loop (next ()) next with StreamEnd -> () val sum : Stream<'T> -> int let sum stream = let sum = ref 0 iter (fun v -> sum := !sum + v) stream !sum
1: 2: 3: 4: 5:
val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R> let zip stream stream' = fun () -> let next, next' = stream (), stream' () fun () -> (next (), next' ())
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18:
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R> let flatMap f stream = let current = ref None fun () -> let next = stream () fun () -> let rec loop () = match !current with | None -> current := Some (f (next ())) loop () | Some next' -> try next' () with StreamEnd -> current := f (next ()) loop () loop ()
1: 2: 3: 4:
Stream.OfArray(data) .filter(i -> i % 2 == 0) .map(i -> i * i) .sum();
The source is pushing data down the pipeline.
1: 2: 3: 4:
val iter : ('T -> unit) -> 'T[] -> unit let iter f values = for value in values do f value
1:
'T[] -> ('T -> unit) -> unit
1: 2: 3: 4: 5: 6:
type Stream<'T> = ('T -> unit) -> unit val ofArray : 'T[] -> Stream<'T> let ofArray values k = for value in values do k value
1: 2: 3: 4: 5:
type Stream = ('T -> unit) -> unit val map : ('T -> 'R) -> Stream<'T> -> Stream<'R> let map f stream = fun k -> stream (fun v -> k (f v))
1: 2: 3:
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T> let filter f stream = fun k -> stream (fun v -> if f v then k v else ())
1: 2: 3: 4: 5:
val sum : Stream<'T> -> int let sum stream = let sum = ref 0 stream (fun v -> sum := !sum + v) !sum
1: 2: 3:
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R> let flatMap f stream = fun k -> stream (fun v -> let stream' = f v in stream' k)
1:
Stream.zip : Stream<'T> -> Stream<'S> -> Stream<'T * 'S>
Zip needs to synchronise the flow of values.
Zip needs to pull!
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15:
type Stream<'T> = ('T -> unit) -> unit type StreamPull<'T> = unit -> (unit -> 'T) val toPull : Stream<'T> -> StreamPull<'T> let toPull stream = ??? val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R> let zip stream stream' = let pullStream, pullStream' = toPull stream, toPull stream' let next, next' = pullStream (), pullStream' () fun k -> try while true do k (next (), next' ()) with StreamEnd -> ()
1: 2: 3: 4: 5: 6: 7: 8:
/// Provides functions for iteration type Iterable = { Bulk : unit -> unit TryAdvance : unit -> bool } /// Represents a Stream of values. type Stream<'T> = Stream of ('T -> unit) -> Iterable
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16:
val ofArray : 'T[] -> Stream<'T> let ofArray values = fun k -> let bulk () = for value in values do k value let index = ref -1 let tryAdvance () = incr index; if !index < Array.length values then (k values.[!index]) true else false { Builk = bulk; TryAdvance = tryAdvance }
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15:
val toPull : Stream<'T> -> StreamPull<'T> let toPull stream = fun () -> let current = ref None let { Bulk = _; TryAdvance = next } = stream (fun v -> current := v) fun () -> let rec loop () = if next () then match !current with | Some v -> current := None v | None -> loop () else raise StreamEnd loop ()
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18:
val toPull : Stream<'T> -> StreamPull<'T> let toPull stream = fun () -> let buffer = new ResizeArray<'T>() let { Bulk = _; TryAdvance = next } = stream (fun v -> buffer.Add(v)) let index = ref -1 fun () -> let rec loop () = incr index if !index < buffer.Count then buffer.[!index] else buffer.Clear() index := -1 if next () then loop () else raise StreamEnd loop ()
1: 2: 3: 4: 5: 6: 7: 8:
let pull = [|1..10|] |> Stream.ofArray |> Stream.flatMap (fun _ -> Stream.infinite) |> Stream.toPull let next = pull () next () // OutOfMemory Exception
Implements a rich set of operations
1: 2: 3: 4: 5: 6:
let data = [| 1..10000000 |] data |> ParStream.ofArray |> ParStream.filter (fun x -> x % 2 = 0) |> ParStream.map (fun x -> x * x) |> ParStream.sum
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11:
type ParStream<'T> = (unit -> ('T -> unit)) -> unit val ofArray : 'T[] -> ParStream<'T> let ofArray values = fun thunk -> let forks = values |> partitions |> Array.map (fun p -> (p, thunk ())) |> Array.map (fun (p, k) -> fork p k) join forks
1: 2: 3: 4: 5: 6: 7: 8:
type ParStream<'T> = (unit -> ('T -> unit)) -> unit val map : ('T -> 'R) -> ParStream<'T> -> ParStream<'R> let map f stream = fun thunk -> stream (fun () -> let k = thunk () (fun v -> k (f v)))
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11:
type ParStream<'T> = (unit -> ('T -> unit)) -> unit val sum : ParStream<'T> -> int let sum stream = let array = new ResizeArray<int ref>() stream (fun () -> let sum = ref 0 array.Add(sum) (fun v -> sum := sum + v) ) array |> Array.map (fun sum -> !sum) |> Array.sum
Example: a word count
1: 2: 3: 4: 5: 6: 7: 8:
cfiles |> CloudStream.ofCloudFiles CloudFile.ReadLines |> CloudStream.collect Stream.ofSeq |> CloudStream.collect (fun line -> splitWords line |> Stream.ofArray) |> CloudStream.filter wordFilter |> CloudStream.countBy id |> CloudStream.sortBy (fun (_,c) -> -c) count |> CloudStream.toCloudArray
Write beautiful functional code with the performance of imperative code.
http://research.microsoft.com/en-us/um/people/simonpj/papers/ndp/haskell-beats-C.pdf
In principal, can be always fused (in-lined).
Not always done by F#/OCaml compilers.
by @biboudis
https://github.com/biboudis/sml-streams
MLton appears to always be fusing.