Select a theme



Select a theme

0 2


core.async-intro

An intro for Clojure's core.async library

On Github narkisr / core.async-intro

Select a theme

Default - Sky - Beige - Simple - Serif - Night Moon - Solarized

core.async

Clojure take on async programming

Created by narkisr / @narkisr

  • Clojure intro
  • Why async?
  • core.async walkthrough
  • core.async on JS

Clojure

  • LISP dialect
  • Concurrency tamed
  • Meta programmable

Calling

 ; prefix notation
(+ 1 1) -> 1 + 1 ;

; call nesting
(println (+ 1 2 )) ->  println(1 + 2);

; interop
(.length "abc") 

Data structures

 ; vector
[1 2 3]

; list 
'(1 2 3)

; map 
{1 2} 

Defing/Scoping

 ; define a value
(def a 2)

(defn -main [name]
  (println "hello " name))

(defn scope []
  (let [a 1 b 2]
    (println a b))) ; 1 2 

Control sturctures

 (while true
  (println 1))

(if (= a 2) 
   (println "its two")
   (println "not two"))

(when foo (println "im foo"))

(loop [i 0]
 (if (= i 10)
   (println "got to 10!")
   (recur (inc i)))) 

Why async?

  • Better use of threads
  • We have to (JS)
  • Coordination

core.async

  • Same model used in Go
  • Just a library
  • Targets JVM/JS

Callbacks

	    ; many to many channel
; can serve many puts/takes
(def c (chan))

; nil since its empty
(take! c (fn [v] (println v)))

; triggers the print 
(put! c "hello world") 

But we are in callback hell!

Blocking put/take

 (def c (chan))

;<!! means: < writes into, !! blocks and not transaction safe
(future (println "done" (>!! c 42)))

; once delivered prints 42
(future (println "Got!" (<!! c)))

; same as future but returns a channel
(println "It works!" (<!! (thread 42))) 

But!

A thread is still required

Go macro

  • A logical thread
  • Turns code to a state machine
  • Inspects channel operations
  • Parks blocking operations
  • Un-Parks when ready to run

Go

	    ; returns a many to many channel
(go 42)

(<!! (go 42))

; notice the use of <! vs <!!
(<!! (go (println "It works!" (<! (go 42)))))
	    

Park vs Block

  • <!! >!! block a thread from a fixed thread pool
  • <! >! multiplexes a logical thread across blocking calls

Buffered channels

  • Back pressure mechanism
  • Producer/Consumer sync

Buffered ..

 (def fbc (chan 1)) 

(go (>! fbc 1)
    (println "done"))

(go (>! fbc 2)
    (println "done")) 

(<!! fbc)
(<!! fbc)
	    

Dropping buffers

  • Don't block
  • Discard messages

Dropping ..

 ; sliding-buffer drops from start
(def fbc (chan (dropping-buffer 1)))

(go (>! fbc 1)
    (println "done"))

(go (>! fbc 2)
    (println "done")) 

; we will get only 1 (2 was thrown away)
(<!! fbc) 

Closing

 (def c (chan))

(close! c)
    
; we get back nil
(<!! c)
	    

Real world example

Tinymasq

 (def lookups (chan (dropping-buffer 100)))
(def answers (chan (dropping-buffer 100)))

(defn accept-loop []
  (go 
    (while true
      (let [pkt (packet (byte-array 1024))]
        (.receive @udp-server pkt)
        (>! lookups pkt))))) 

Processing

 (defn process-loop []
  (go 
    (while true
      (let [pkt (<! lookups) 
            message (Message. (.getData pkt)) 
            record (.getQuestion message) 
            host (.toString (.getName record) false)
            ip (get-host (normalized-host host))]
        (when ip
          (.addRecord message (record-of host (into-bytes ip)) Section/ANSWER))
        (.setData pkt (.toWire message))
        (>! answers pkt))))) 

Reply

 (defn reply-loop []
  (go
    (while true
     (let [pkt (<! answers)] 
       (.send @udp-server pkt))))) 

Multiplexing

 (def a (chan))
(def b (chan))

(put! a 42)                    

; will return [42 channel-with-response]
(alts!! [a b])  

(alts!! [a :default :meh]) 

Timeout

 (<!! (timeout 1000))
  
; [nil timeout-channel] 
(alts!! [a (timeout 1000)]) 

; alt with a write no takers so [nil timeout-channel] 
(alts!! [[a 42] (timeout 1000)]) 

Alts ordering

 (put! a 1)
(put! b 2)

; order will is random (prevent starving)
(alts!! [a b]) 

; we can have priority by ordering
(alts!! [a b] :priority true) 

Mult and Tap

 (def to-mult (chan 1))
(def m (mult to-mult))

(dotimes [n 4]
  (let [c (chan 1)]
    (tap m c)
    (go 
      (while true
         (when-let [v (<! c)]
           (println "Got! " v)
         (println "Exiting!"))))))

(>!! to-mult 42)
(>!! to-mult 43)

(close! to-mult) 

Pubsub

 (def to-pub (chan 1))
(def p (pub to-pub :type))

(let [c (chan 1)]
  (sub p :error c)
    (go 
      (while true
        (when-let [e (<! c)]
   	   (println "got an error" e)))))

(>!! to-pub {:type :error :msg "bad thing"})
(close! to-pub) 

Clojurescript

  • Clojure compiled to JS
  • Not full Clojure but pretty close
  • core.async supports it
  • No thread just go blocks

10k processes

 (let [render (render-loop 40)]
  (loop [i 0]
    (when (< i (* width height))
      (go 
        (while true
          ; sleeping for a random time
          (<! (timeout (+ 1000 (rand-int 10000))))
          ; passing position and color 
          (>! render [(rand-int 10000) (rand-int 10)])))
      (recur (inc i))))) 

Thank you!

@narkisr https://github.com/narkisr