through the lens of OCaml/F#
ICOOOLPS'14 arxiv
1:
source |> inter |> inter |> inter |> terminal
1: 2: 3: 4: 5: 6: 7:
// let (|>) a f = f a let data = [| 1..10000000 |] data |> Stream.ofArray |> Stream.filter (fun i -> i % 2 = 0) |> Stream.map (fun i -> i * i) |> Stream.sum
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12:
type Stream<'T> = unit -> (unit -> 'T)
val ofArray : 'T[] -> Stream<'T>
let ofArray values =
fun () ->
let index = ref -1
(fun () ->
incr index
if !index < Array.length values then
f values.[!index])
else
raise StreamEnd)
1: 2: 3: 4: 5:
val map : ('T -> 'R) -> Stream<'T> -> Stream<'R>
let map f stream =
fun () ->
let next = stream ()
fun () -> f (next ())
1: 2: 3: 4: 5: 6: 7:
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T>
let filter p stream =
let rec loop v next =
if p v then v else loop (next ()) next
fun () ->
let next = stream ()
fun () -> loop (next ()) next
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15:
val iter : ('T -> unit) -> Stream<'T> -> unit
let iter f stream
let rec loop v next =
f v; loop (next ()) next
let next = stream ()
try
loop (next ()) next
with StreamEnd -> ()
val sum : Stream<'T> -> int
let sum stream =
let sum = ref 0
iter (fun v -> sum := !sum + v) stream
!sum
1: 2: 3: 4: 5:
val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R>
let zip stream stream' =
fun () ->
let next, next' = stream (), stream' ()
fun () -> (next (), next' ())
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18:
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R>
let flatMap f stream =
let current = ref None
fun () ->
let next = stream ()
fun () ->
let rec loop () =
match !current with
| None ->
current := Some (f (next ()))
loop ()
| Some next' ->
try
next' ()
with StreamEnd ->
current := f (next ())
loop ()
loop ()
1: 2: 3: 4:
Stream.OfArray(data)
.filter(i -> i % 2 == 0)
.map(i -> i * i)
.sum();
The source is pushing data down the pipeline.
1: 2: 3: 4:
val iter : ('T -> unit) -> 'T[] -> unit
let iter f values =
for value in values do
f value
1:
'T[] -> ('T -> unit) -> unit
1: 2: 3: 4: 5: 6:
type Stream<'T> = ('T -> unit) -> unit
val ofArray : 'T[] -> Stream<'T>
let ofArray values k =
for value in values do
k value
1: 2: 3: 4: 5:
type Stream = ('T -> unit) -> unit
val map : ('T -> 'R) -> Stream<'T> -> Stream<'R>
let map f stream =
fun k -> stream (fun v -> k (f v))
1: 2: 3:
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T>
let filter f stream =
fun k -> stream (fun v -> if f v then k v else ())
1: 2: 3: 4: 5:
val sum : Stream<'T> -> int
let sum stream =
let sum = ref 0
stream (fun v -> sum := !sum + v)
!sum
1: 2: 3:
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R>
let flatMap f stream =
fun k -> stream (fun v -> let stream' = f v in stream' k)
1:
Stream.zip : Stream<'T> -> Stream<'S> -> Stream<'T * 'S>
Zip needs to synchronise the flow of values.
Zip needs to pull!
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15:
type Stream<'T> = ('T -> unit) -> unit
type StreamPull<'T> = unit -> (unit -> 'T)
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream = ???
val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R>
let zip stream stream' =
let pullStream, pullStream' = toPull stream, toPull stream'
let next, next' = pullStream (), pullStream' ()
fun k ->
try
while true do
k (next (), next' ())
with StreamEnd -> ()
1: 2: 3: 4: 5: 6: 7: 8:
/// Provides functions for iteration
type Iterable = {
Bulk : unit -> unit
TryAdvance : unit -> bool
}
/// Represents a Stream of values.
type Stream<'T> = Stream of ('T -> unit) -> Iterable
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16:
val ofArray : 'T[] -> Stream<'T>
let ofArray values =
fun k ->
let bulk () =
for value in values do
k value
let index = ref -1
let tryAdvance () =
incr index;
if !index < Array.length values then
(k values.[!index])
true
else
false
{ Builk = bulk; TryAdvance = tryAdvance }
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15:
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream =
fun () ->
let current = ref None
let { Bulk = _; TryAdvance = next } = stream (fun v -> current := v)
fun () ->
let rec loop () =
if next () then
match !current with
| Some v ->
current := None
v
| None -> loop ()
else raise StreamEnd
loop ()
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18:
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream =
fun () ->
let buffer = new ResizeArray<'T>()
let { Bulk = _; TryAdvance = next } = stream (fun v -> buffer.Add(v))
let index = ref -1
fun () ->
let rec loop () =
incr index
if !index < buffer.Count then
buffer.[!index]
else
buffer.Clear()
index := -1
if next () then
loop ()
else raise StreamEnd
loop ()
1: 2: 3: 4: 5: 6: 7: 8:
let pull =
[|1..10|]
|> Stream.ofArray
|> Stream.flatMap (fun _ -> Stream.infinite)
|> Stream.toPull
let next = pull ()
next () // OutOfMemory Exception
Implements a rich set of operations
1: 2: 3: 4: 5: 6:
let data = [| 1..10000000 |] data |> ParStream.ofArray |> ParStream.filter (fun x -> x % 2 = 0) |> ParStream.map (fun x -> x * x) |> ParStream.sum
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11:
type ParStream<'T> = (unit -> ('T -> unit)) -> unit
val ofArray : 'T[] -> ParStream<'T>
let ofArray values =
fun thunk ->
let forks =
values
|> partitions
|> Array.map (fun p -> (p, thunk ()))
|> Array.map (fun (p, k) -> fork p k)
join forks
1: 2: 3: 4: 5: 6: 7: 8:
type ParStream<'T> = (unit -> ('T -> unit)) -> unit
val map : ('T -> 'R) -> ParStream<'T> -> ParStream<'R>
let map f stream =
fun thunk ->
stream (fun () ->
let k = thunk ()
(fun v -> k (f v)))
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11:
type ParStream<'T> = (unit -> ('T -> unit)) -> unit
val sum : ParStream<'T> -> int
let sum stream =
let array = new ResizeArray<int ref>()
stream (fun () ->
let sum = ref 0
array.Add(sum)
(fun v -> sum := sum + v)
)
array |> Array.map (fun sum -> !sum) |> Array.sum
Example: a word count
1: 2: 3: 4: 5: 6: 7: 8:
cfiles |> CloudStream.ofCloudFiles CloudFile.ReadLines |> CloudStream.collect Stream.ofSeq |> CloudStream.collect (fun line -> splitWords line |> Stream.ofArray) |> CloudStream.filter wordFilter |> CloudStream.countBy id |> CloudStream.sortBy (fun (_,c) -> -c) count |> CloudStream.toCloudArray
Write beautiful functional code with the performance of imperative code.
http://research.microsoft.com/en-us/um/people/simonpj/papers/ndp/haskell-beats-C.pdf
In principal, can be always fused (in-lined).
Not always done by F#/OCaml compilers.
by @biboudis
https://github.com/biboudis/sml-streams
MLton appears to always be fusing.