Pedestal App

Functional dataflow/reactive programming in ClojureScript

Konrad Garus / squirrel.pl / @trzmiel

Clojure

Hello world


(ns my-namespace)            ; Namespace

(def greeting "Hello")       ; Var named greeting, value "Hello"

(defn greet [name]           ; Function named geet, has one arg: name
  (println greeting name))   ; ... and prints to standard output
  


(ns my-other-namespace)
(my-namespace/greet "World") ; Function call
					

Primitive types


; Numbers
(def my-numbers [1 1/3 2.5])

; Strings
(def my-string "Hello")

; Keywords
(def my-keyword :greeting)
						

Sequential


; List
(def my-list '(1 2 3 4))
(def my-list-2 (list 1 2 3 4))

; Vector
(def my-vec [1 2 3 4])
(def my-vec-2 (vector 1 2 3 4))
						

Set


(def kids #{"Gabby", "Nicky"})
(def kids-2 (set ["Gabby" "Nicky"]))

(kids "Gabby")
; => "Gabby"

(kids "Jimmy")
; => nil

						

Map


(def car {:brand "Ford", :model "Fusion", :year 2005 })

(car :brand)
; => "Ford"

(:model car)
; => "Fusion"

(assoc car :year 2010)
; => {:brand "Ford", :model "Fusion", :year 2010 }
						

Map


(def people {:people {:list [{:name "Johnny" :salary 20000}
                             {:name "Jim" :salary 12000}]}})
                           
(update-in people [:people :list] conj {:name "Tommy" :salary 5000})
;=> {:people  {:list [{:name "Johnny", :salary 20000} 
;                     {:name "Jim", :salary 12000} 
;                     {:name "Tommy", :salary 5000}]}}

(println (map :name (get-in people [:people :list]))) 
;=> (Johnny Jim) 

(assoc-in people [:people :team] "Apollo 13")
;=> {:people {:team "Apollo 13", 
;             :list [{:name "Johnny", :salary 20000} 
;                    {:name "Jim", :salary 12000}]}}

Atom

Holds value that can be replaced with isolated, atomic operations.

(def counter (atom 0))

; Deref to get value:
(println @counter)      ;=> 0

; Atomically value with inc
; Logically counter := (inc counter)
(swap! counter inc)     ;=> 1
(println @counter)      ;=> 1
; Logically counter := (+ counter 41)
(swap! counter + 41)    ;=> 42

; Atomically set value
; Logically counter := 11
(reset! counter 11)     ;=> 11

Atom as Model

Common practice: Model as a tree (nested map) in atom.

(def model (atom {:people 
                   {:list [{:name "Johnny" :salary 20000}
                           {:name "Jim" :salary 12000}]}}))
                           
(swap! model update-in [:people :list] conj {:name "Tommy"
                                             :salary 5000})

(println (map :name (get-in @model [:people :list]))) 
;=> (Johnny Jim Tommy)

(swap! model assoc-in [:people :team] "Apollo 13")
;=> {:people {:team "Apollo 13", 
;             :list [{:name "Johnny", :salary 20000} 
;                    {:name "Jim", :salary 12000} 
;                    {:name "Tommy", :salary 5000}]}}
						

Binding


(let [values [1 2 3]]
  (println values))
; => [1 2 3]

List Destructuring


(let [values [1 2 3]
      [x y & xs] values]
  (println "x" x ", y" y ", xs" xs))
; => x 1 , y 2 , xs (3)

(defn print-first [[x & xs]]
  (println x))
(print-first [1 2 3])
; => 1

Map Destructuring


(let [values {:name "John" :born 1970}
      {:keys [name born]} values]
  (println "Name:" name ", born: " born))
; => Name: John , born:  1970

(defn print-name [{name-to-print :name}]
  (println name-to-print))

(print-name {:name "John" :born 1982})
; => John

ClojureScript

What is ClojureScript?

Clojure compiled to JavaScript with Google Closure.

Why?

  • Replace JS with expressive, clean, powerful, modern language: Clojure
  • Same language on server and client
  • Same code on server and client
  • Leverage Google Closure for:
    • Modularity
    • Optimization
    • Decent standard library

Hello world


(ns my-namespace)            

(def greeting "Hello")       

(defn greet [name]           
  (.log js/console greeting name))

(greet "World")              
					
Compiles to:

goog.provide("my_namespace");
goog.require("cljs.core");
my_namespace.greeting = "Hello";
my_namespace.greet = function greet(name) {
  return console.log(my_namespace.greeting, name)
};
my_namespace.greet.call(null, "World");
					

Hello world


(ns my-namespace)            

(def greeting "Hello")       

(defn greet [name]           
  (.log js/console greeting name))

(greet "World")              
					
Compiles to (simple optimizations):
					
					var my_namespace = {greeting:"Hello", greet:function(a) {
  return console.log(my_namespace.greeting, a)
}};
my_namespace.greet.call(null, "World");
					

Hello world


(ns my-namespace)            

(def greeting "Hello")       

(defn greet [name]           
  (.log js/console greeting name))

(greet "World")              
					
Compiles to (advanced optimizations):

console.log("Hello", "World");
					

JavaScript Interop

  • ClojureScript collections are not JavaScript objects, arrays etc.
  • They are the same immutable, persistent data structures as in Clojure.
  • Does not matter as long as we're writing in pure ClojureScript. For JS libs you can map:
    
    (.log js/console (clj->js {:name "Jimmy" :born 1977}))
    ;=> Prints: Object {name: "Jimmy", born: 1977} 
    
    (def user (js-obj :name "Jimmy" :born 1977))
    (aset user "name" "Johnny")
    (.log js/console (aget user "name"))
    ;=> Johnny
    
    (let [clj-user (js->clj user)]
      (clj-user "born"))
    ;=> 1977
    
    					

JavaScript Interop

  • ClojureScript has almost all Clojure, including macros and persistent data structures.
  • You can use most JavaScript features. Not always the best idea for idiomatic code, but there is full interop if you need it.
  • If using external libraries, you need to understand Google Closure: externs, exports etc.
  • Outside scope of this presentation.

Pedestal

Creators / Maintainers

Created by Relevance. Today known as Cognitect after merger with Rich Hickey and Metadata Partners.

Random Info

  • Eclipse Public License
  • First GitHub commit: March 2013. Last: October 2013.

Demo Time

Functional Reactive Programming (FRP)

Dataflow Programming

  • Continuous flow of events / data updates over time
  • Other values / streams defined as derivatives
  • Dataflow analogy: Spreadsheet
  • Multiple inputs and outputs
  • Abstract away dependencies from computation, dataflow from inputs and outputs
  • + Functional: pure functions, immutable values...

App Overview

Stage 1: First input and transform

Counter of messages received

Main


(ns monitor-client.start
  (:require [io.pedestal.app.protocols :as p]
            [io.pedestal.app :as app]
            [io.pedestal.app.render.push :as push-render]
            [io.pedestal.app.render :as render]
            [io.pedestal.app.messages :as msg]
            [monitor-client.behavior :as behavior]
            [monitor-client.rendering :as rendering]))

(defn create-app [render-config]
  (let [app (app/build behavior/monitor-app)
        render-fn (push-render/renderer 
                     "content" render-config render/log-fn)
        app-model (render/consume-app-model app render-fn)]
    (app/begin app)
    {:app app :app-model app-model}))

(defn ^:export main []
  (create-app (rendering/render-config)))
					

Main (Simulated Services)


(ns monitor-client.simulated.start
  (:require [monitor-client.start :as start]
            [monitor-client.rendering :as rendering]
            [io.pedestal.app.protocols :as p]
            [monitor-client.simulated.services :as services]))
            			
(defn ^:export main []
  (let [render-config (rendering/render-config))
        app (start/create-app render-config)
        services (services/->MockServices (:app app))]
    (p/start services)
    app))
					

Simulated Services



(ns monitor-client.simulated.services
  (:require [io.pedestal.app.protocols :as p]
            [io.pedestal.app.messages :as msg]
            [io.pedestal.app.util.platform :as platform]))

(def received-count (atom 0))

(def freq-ms 1000)

(defn receive-messages [input-queue]
  (advance-state)
  (let [ts (.getTime (js/Date.))
        ts-seconds (* (int (/ ts 1000)) 1000)]
    (p/put-message input-queue {msg/type :set-value
                                msg/topic [:received :count]
                                :value @received-count 
                                :tstamp ts-seconds }))
  
  (platform/create-timeout freq-ms #(receive-messages input-queue)))

(defrecord MockServices [app]
  p/Activity
  (start [this] (receive-messages (:input app)))
  (stop [this]))
					

Transform

Purpose: Update data model in response to message from input queue.

Transform Dataflow

Reference:

; Transform binding (first matching wins):
(def app {:transform [[type topic handler-fn]
                      [type topic handler-fn]]})

; Handler:
(defn handler-fn [old-value message]
  (do-stuff))					
					
Implementation:

(ns ^:shared monitor-client.behavior)

(defn set-received-count [old message]
  (let [{:keys [tstamp value]} message]
    [tstamp value]))

(def monitor-app
  {:transform [[:set-value [:received :count] set-received-count]]})					
					

{msg/type :set-value             |
 msg/topic [:received :count]    |=>   [1382906411000 17]   
 :value 17                       |
 :tstamp 1382906411000 }         |
					

Result

Stage 2: Derive

History of messages received counter.

Derive

Purpose: Update data model in response to other update(s) of data model.

Dataflow (derive)

Reference:

; Derive binding (executed repeatedly as long as there are any matches):
(def app {:derive #{[#{inputs} output function arg-type]
                    [#{inputs} output function arg-type]}})

; Handler:
(defn derive-fn [old-value input] (do-stuff))					
					
Implementation:

(defn derive-received-count-history [old val]
  (let [history (concat old [val])]
    (take-last 5 history)))

(def monitor-app
  {:transform [...], :derive #{[#{[:received :count]} 
                               [:received :count-history] 
                               derive-received-count-history
                               :single-val]}})
   
[[1382906411000 17]      |         [[1382906411000 17]
 [1382906412000 23]]     |     =>   [1382906412000 23]
                         |          [1382906413000 25]]
[1382906413000 25]       |

Result

Stage 3: Emit and Render

Let's put it on a chart!

Emit

Purpose: Emit changes in data model to renderers in response to changes in data model.

Reminder: App model changes have their own queue.

Model: Vector of deltas

(def sample-deltas
  [[:node-create [] :map]
   [:node-create [:received-counts] :map]
   [:value [:received-counts] nil ([1382908458000 9])]])
					

Types of deltas

:node-create, :node-destroy
Create/destroy node in app model tree


;            [:node-create  path               node-type])
(def deltas [[:node-create  [:received-counts] :map     ]
             [:node-destroy [:received-counts]]])

					
:value, :attr
Set value in app model tree

;          [:value path               old current            ]
(def delta [:value [:received-counts] nil ([1382908458000 9])])
					
:transform-enable, :transform-disable
Control behavior

;          [:transform-enable path          type    messages]
(def delta [:transform-enable [:connected]  :start  [{msg/topic [:connected] 
                                                      msg/type :start}]])
					

App model is not stored, but tree is a good DOM abstraction.

Dataflow (emit)

Reference:

; Emit binding (all matching are applied)
(def app {:emit [{:in #{inputs} :init init-function :fn function}
                 {:in #{inputs} :init init-function :fn function}]

; Handler:
(defn emit-fn [state] (do-stuff))
; state includes old model, current model, message, input paths...			
					
Implementation:

(defn init-recd-count-history []
  [[:node-create [:received-counts] :map]])

(defn emit-recd-count-history [{:keys [new-model]}]
  [[:value 
    [:received-counts] 
    (get-in new-model [:received :count-history])]])

(def monitor-app
  {:emit [{:in #{[:received :count-history]}
           :fn emit-recd-count-history
           :init init-recd-count-history}]})
   

Renderer

Reference:

(defn renderer-fn [renderer delta input-queue] (do-stuff))					
					
Implementation:

(ns monitor-client.rendering)

(def received-counts-chart (atom nil))

(defn prepare-received-counts-chart [_ _ _]
  (reset! received-counts-chart ($/plot "#received_counts" 
                                       (clj->js [])
                                       (clj->js {:xaxis { :mode "time"}}))))
; $.plot("#received_counts", [], {"xaxis": {"mode": "time"}})

(defn render-received-counts-chart [_ [_ _ _ new-value] _]
  (let [data new-value]
    (.setData @received-counts-chart (clj->js [data]))
    (.setupGrid @received-counts-chart)
    (.draw @received-counts-chart)))
					
(defn render-config []
  [[:node-create [:received-counts] prepare-received-counts-chart]
   [:value [:received-counts] render-received-counts-chart]])
   

Result

Recap

What have we done so far.

App Overview

Main


(defn create-app [render-config]
  (let [app (app/build behavior/monitor-app)
        render-fn (push-render/renderer 
                     "content" render-config render/log-fn)
        app-model (render/consume-app-model app render-fn)]
    (app/begin app)
    {:app app :app-model app-model}))

(defn ^:export main []
  (create-app (rendering/render-config)))
					

Behavior


(defn set-received-count [old message]
  (let [{:keys [tstamp value]} message]
    [tstamp value]))

(defn derive-received-count-history [old val]
  (let [history (concat old [val])]
    (take-last 60 history)))

(defn init-recd-count-history []
  [[:node-create [:received-counts] :map]])

(defn emit-recd-count-history [{:keys [old-model new-model]}]
  [[:value 
    [:received-counts] 
    (get-in new-model [:received :count-history])]])

(def monitor-app
  {:version 2
   :transform [[:set-value [:received :count] set-received-count]]
   :derive #{[#{[:received :count]} 
              [:received :count-history] 
              derive-received-count-history 
              :single-val]}
   :emit [{:in #{[:received :count-history]}
           :fn emit-recd-count-history
           :init init-recd-count-history}})
					

Rendering


(ns monitor-client.rendering)

(def received-counts-chart (atom nil))

(defn prepare-received-counts-chart [_ _ _]
  (reset! received-counts-chart ($/plot "#received_counts" 
                                       (clj->js [])
                                       (clj->js {:xaxis { :mode "time"}}))))

(defn render-received-counts-chart [_ [_ _ _ new-value] _]
  (let [data new-value]
    (.setData @received-counts-chart (clj->js [data]))
    (.setupGrid @received-counts-chart)
    (.draw @received-counts-chart)))
					
(defn render-config []
  [[:node-create [:received-counts] prepare-received-counts-chart]
   [:value [:received-counts] render-received-counts-chart]])
   

Stage 4: Behavior and effects

Goal: Add a button to manually initiate connection to server.

Idea

  • "Connect" button is shown on launch.
  • When pressed, send a message to input queue to initialize connection.
  • When processing this message, call the services to initialize connection.

Transform enable/disable


(defn init-connected [arg]
  [[:transform-enable [:connected] :start
    [{msg/topic [:connected] msg/type :start}]]])
    
(defn emit-connected [{:keys [new-model] :as v}]
  (when (get-in new-model [:connected])
    [[:transform-disable [:connected] :start]]))

(def monitor-app {:emit [{:in #{[:connected]}
                          :fn emit-connected
                          :init init-connected}
                         ;...
                        ]})
					
The (optional) messages in :transform-enable will be sent when the button bound to this transform is clicked.

Rendering




(ns monitor-client.rendering
  (:require ; ...
            [io.pedestal.app.render.push.handlers :as h]
            [jayq.core :refer [$]))					
					
(defn hide-connect-button [_ _ _]
  (.hide ($ :#connect_button)))
					
(defn render-config []
  [ ; ...
   [:transform-enable [:connected] (h/add-send-on-click "connect_button")]
   [:transform-disable [:connected] hide-connect-button]])
					

Back to behavior: Effects

Send messages to services in response to data model changes.

Reference:


; Effect binding (all matching are applied)
(def app {:effect #{[#{inputs} effect-fn arg-type]
                    [#{inputs} effect-fn arg-type]}

; Handler:
(defn effect-fn [arg] (do-stuff))			
					
Implementation:

(defn connect [_ message] true)

(defn start-connection [val]
  (when val [{msg/topic [:connect] :value true}]))

(def monitor-app
  {:transform [[:start [:connected] connect]]
   :emit [ ; ...
          {:in #{[:connected]}
           :fn emit-connected
           :init init-connected}]
 
   :effect #{[#{[:connected]} start-connection :single-val]}})
					

Services


(def connected (atom false))
					
(defn receive-messages [input-queue]
  (advance-state)
  (let [ts (.getTime (js/Date.))
        ts-seconds (* (int (/ ts 1000)) 1000)]
    (when @connected
      (p/put-message input-queue {msg/type :set-value
                                  msg/topic [:received :count]
                                  :value @received-count 
                                  :tstamp ts-seconds })))
  
  (platform/create-timeout frequency-ms #(receive-messages input-queue)))
  
(defn services-fn [message input-queue]
  (.log js/console (str "Sending message to server: " message))
  (case (msg/topic message)
    [:connect] (reset! connected (:value message))))
					

Main


(defn ^:export main []
  (let [ ; (Prepare renderer...)
        app (start/create-app render-config)
        input-queue (get-in app [:app :input])
        services (services/->MockServices (:app app))]
    (app/consume-effects (:app app) services/services-fn) ; <--
    (p/start services)
    app))
					

Result

Stage 5: Replace counter with TPS

Goal: Instead of drawing increasing counter, draw transactions per second.

Behavior


(defn derive-received-tps [_ {:keys [old-model new-model]}]
  (if-let [[_ old-count] (get-in old-model [:received :count])]
    (let [[new-time new-count] (get-in new-model [:received :count])]
      [new-time (- new-count old-count)])))

(defn derive-history [old val]
  (when val (let [history (concat old [val])]
              (take-last 60 history))))

(defn init-recd-tps-history []
  [[:node-create [:received-tps] :map]])

(defn emit-recd-tps-history [{:keys [old-model new-model]}]
  [[:value [:received-tps] (get-in new-model [:received :tps-history])]])
  
(def monitor-app 
  {:derive #{[#{[:received :count]} [:received :tps] derive-received-tps]
             [#{[:received :tps]} [:received :tps-history] derive-history :single-val]}
   :emit [{:in #{[:received :tps-history]}
           :fn emit-recd-tps-history
           :init init-recd-tps-history}]})
					

Rendering

No changes in implementation, just renaming paths and vars. This:

(defn render-config []
  [[:node-create [:received-counts] prepare-received-counts-chart]
   [:value [:received-counts] render-received-counts-chart]])
					
... becomes:

(defn render-config []
  [[:node-create [:received-tps] prepare-received-tps-chart]
   [:value [:received-tps] render-received-tps-chart]])
					

... and so on.

Functions to prepare and render the chart are the same.

Result

Stage 6: "Processed" counter

Goal: Add another data stream - total number of messages processed by system.

Services


(def received-count (atom 0))
(def processed-count (atom 0))

(defn receive-messages [input-queue]
  (advance-state)
  (let [ts (.getTime (js/Date.))
        ts-seconds (* (int (/ ts 1000)) 1000)]
    (when @connected
      (p/put-message input-queue {msg/type :set-value
                                  msg/topic [:received :count]
                                  :value @received-count :tstamp ts-seconds })
      (p/put-message input-queue {msg/type :set-value
                                  msg/topic [:processed :count]
                                  :value @processed-count :tstamp ts-seconds })))
  
  (platform/create-timeout frequency-ms #(receive-messages input-queue)))
					

Behavior - dataflow


(def monitor-app
  {:transform [[:set-value [:received :count] set-count]
               [:set-value [:processed :count] set-count]]
   :derive #{[#{[:received :count]} [:received :tps] derive-tps]
             [#{[:received :tps]} [:received :tps-history] derive-history :single-val]
             
             [#{[:processed :count]} [:processed :tps] derive-tps]
             [#{[:processed :tps]} [:processed :tps-history] derive-history :single-val]}
   :emit [{:in #{[:received :tps-history] [:processed :tps-history]}
           :fn emit-tps-history
           :init init-tps-history}]})					
					

Behavior - implementation


(defn derive-tps [_ {:keys [old-model new-model input-paths]}]
  (let [input-path (first input-paths)]
    (if-let [[_ old-count] (get-in old-model input-path)]
      (let [[new-time new-count] (get-in new-model input-path)]
        [new-time (- new-count old-count)]))))

(defn init-tps-history []
  [[:node-create [:tps] :map]])

(defn emit-tps-history [{:keys [old-model new-model]}]
  (let [recd (get-in new-model [:received :tps-history])
        processed (get-in new-model [:processed :tps-history])
        [last-recd-ts _] (last recd)
        [last-processed-ts _] (last processed)]
    (when (= last-recd-ts last-processed-ts)
      [[:value [:tps] {:received recd :processed processed}]])))
					

Behavior - wildcards


(def monitor-app
  {:transform [[:set-value [:**] set-count]]
   :emit [{:in #{[:* :tps-history]}
           :fn emit-tps-history
           :init init-tps-history}]})
					
  • Possible, but harder with derives.
  • Output has to be well defined in mapping, so we need to use the common root.
  • Then the derive function needs to figure out what part of the model to update under this common subtree.

Rendering


(defn render-tps-chart [_ [_ _ _ new-value] _]
  (when (not-empty new-value)
    (let [{:keys [received processed]} new-value
          data [{:data received :label "Received TPS"}
                {:data processed :label "Processed TPS"}]
          xaxis (get (aget (.getOptions @tps-chart) "xaxes") 0)
          [last-time _] (last received)
          one-minute-ago (- last-time 60000)]
      (aset xaxis "min" one-minute-ago)
      (aset xaxis "max" last-time)
      (.setData @tps-chart (clj->js data))
      (.setupGrid @tps-chart)
      (.draw @tps-chart))))		
      
(defn render-config []
  [[:value [:tps] render-tps-chart]]			
					

Result - Data UI

Result - Application

Stage 7: Backlog

Draw a chart with backlog: received - processed

Behavior - derive backlog


(defn derive-backlog [old {:keys [received processed]}]
  (let [[received-ts received-count] received
        [processed-ts processed-count] processed]
    (if (= received-ts processed-ts)
      [received-ts (- received-count processed-count)]
      old)))					
					
(def monitor-app
  {:derive #{[{[:received :count] :received, [:processed :count] :processed} 
              [:backlog :count] 
              derive-backlog 
              :map]
             
             [#{[:backlog :count]} 
              [:backlog :count-history] 
              derive-history 
              :single-val]}})
					

Behavior - emitter


(defn init-backlog-history []
  [[:node-create [:backlog] :map]])

(defn emit-backlog-history [{:keys [input-paths new-model]}]
  (let [in-path (first input-paths)
        val (get-in new-model in-path)]
    (when val [[:value [:backlog] val]])))

(def monitor-app
  {:emit [{:in #{[:backlog :count-history]}
           :fn emit-backlog-history
           :init init-backlog-history}]})
					

Rendering


(def charts (atom {}))
(defn prepare-chart [id]
  (fn [_ _ _] (let [chart ($/plot id (clj->js [])
                                     (clj->js {:xaxis { :mode "time"}}))]
                (swap! charts assoc id chart))))

(defn render-chart [data max-time id]
  (let [chart (@charts id)]
    ; [update min/max value of X axis...]
    (.setData chart (clj->js data))
    (.setupGrid chart)
    (.draw chart)))

(defn render-tps-chart [_ [_ _ _ new-value] _] (omitted))

(defn render-backlog-chart [_ [_ _ _ new-value] _]
  (when (not-empty new-value)
    (let [data [{:data new-value :label "Backlog"}]
          [last-time _] (last new-value)]
      (render-chart data last-time "#backlog"))))

(defn render-config []
  [[:node-create [:tps] (prepare-chart "#received_counts")]
   [:value [:tps] render-tps-chart]
   [:node-create [:backlog] (prepare-chart "#backlog")]
   [:value [:backlog] render-backlog-chart]])
					

Result

Stage 8: Multiple servers

Turn the "processed" counter to a sum of several counters representing individual servers

Services

Was:
					
(def processed-count (atom 0))

(defn receive-messages [input-queue]
  ; [...]
  (p/put-message input-queue {msg/type :set-value
                              msg/topic [:processed :count]
                              :value @processed-count
                              :tstamp ts-seconds })) 
					
Now:

(def processed-counters (atom {"gabby" 0 "nicky" 0}))

(defn receive-messages [input-queue]
  ; [...]
  (p/put-message input-queue {msg/type :set-value
                              msg/topic [:server :count]
                              :tstamp ts-seconds
                              :value @processed-counters}))
					

Behavior


(defn sum-server-counts [_ [time server-counts]]
  [time (reduce #(+ %1 (%2 1)) 0 server-counts)])

(def monitor-app
  {:derive #{[#{[:server :count]} 
              [:processed :count] 
              sum-server-counts 
              :single-val]}})
					
That is all. Everything else just works, cascading updates to other metrics.

Stage 9: Draw server TPS

Draw one more chart, showing messages processed per second by each server.

Derive TPS and history from counter


(defn server-tps [old {:keys [old-model new-model]}]
  (if-let [[_ old-count] (get-in old-model [:server :count])]
    (let [[new-time new-count] (get-in new-model [:server :count])
          server-names (keys new-count)]
      (loop [servers server-names, accumulator {}]
        (let [[server & servers] servers]
          (if server
            (recur servers (assoc accumulator server
                                  (- (new-count server) (old-count server))))
            [new-time accumulator]))))))					
					
(def monitor-app
  {:derive 
   #{[#{[:server :count]} [:server :tps] server-tps]
     [#{[:server :tps]} [:server :tps-history] derive-history :single-val]}})

					
Produces:

(def out {:server {:tps-history [[1382994414000 {"gabby" 1, "nicky" 7}]
                                 [1382994415000 {"gabby" 4, "nicky" 4}]
                                 [1382994416000 {"gabby" 3, "nicky" 4}]]}})
					

Emit


(defn emit-server-history [{:keys [new-model]}]
  (let [history (get-in new-model [:server :tps-history])
        servers (keys (second (first history)))]
    (loop [servers servers, accum {}]
      (let [[server & servers] servers]
        (if server
          (recur 
            servers 
            (assoc accum server (map (fn [[t m]] [t (m server)]) history)))
          [[:value [:server-tps] accum]])))))
					
(def monitor-app
  {:emit 
    ; No longer #{[:* :tps-history]}
   [{:in #{[:received :tps-history] [:processed :tps-history]} :fn emit-tps-history}
    {:in #{[:server :tps-history]} :fn emit-server-history}]})

					

(def val {:server {:tps-history [[1382994414000 {"gabby" 1, "nicky" 7}]
                                 [1382994415000 {"gabby" 4, "nicky" 4}]]}})
					

(def out [[:value [:server-tps] {"gabby" [[1382994414000 1]
                                          [1382994415000 4]]
                                 "nicky" [[1382994414000 7] 
                                          [1382994415000 4]]}]])
					

Render


(defn render-server-tps-chart [_ [_ _ _ new-value] _]
  (when (not-empty new-value)
    (let [data (map 
                 (fn [[server values]] 
                   {:label (str server " TPS") :data values}) 
                 new-value)
          [server values] (first new-value) 
          [last-time _] (last values)]
      (render-chart data last-time "#server_tps"))))

(defn render-config []
  [[:node-create [:server-tps] (prepare-chart "#server_tps")]
   [:value [:server-tps] render-server-tps-chart]
					

Result

Stage 10: Control the Simulator

Demonstrate form handling and jQuery integration.

Services - simulator params exposed


(def simulation-params (atom {:received {:mean 10 :sigma 2 :sine-amplitude 5}
                              :servers {"gabby" {:mean 6 :sigma 3}
                                        "nicky" {:mean 3 :sigma 1}}}))

(defn push-simulation-params [input-queue]
  (p/put-message input-queue {msg/type :set
                              msg/topic [:simulation]
                              :value @simulation-params}))

(defn connection-established [message input-queue]
  (reset! connected (:value message))
  (push-simulation-params input-queue))

(defn services-fn [message input-queue]
  (.log js/console (str "Sending message to server: " message))
  (case (msg/topic message)
    [:connect] (connection-established message input-queue)
    [:simulation] (reset! simulation-params (:value message)))) 

					

Behavior


(defn emit-simulation [{:keys [new-model old-model]}]
  (let [new-value [:value [:simulation] (:simulation new-model)]]
    (if (old-model :simulation)
      [new-value]
      [[:transform-enable [:simulation] :update]
       new-value])))					
					
(defn update-simulation-params [val]
  (when val [{msg/topic [:simulation] :value val}]))

(def monitor-app
  {:transform [[:set [:**] #(:value %2)]]
   
   :emit [{:in #{[:simulation]} :fn emit-simulation}]
   
   :effect #{[#{[:simulation]} update-simulation-params :single-val]}})
					

HTML

Long, boring, omitted.

Rendering - show form


(defn show-simulation-params [_ _ input-queue]
  (.click 
    ($ :#simulation_update_button)
    #(do 
       (update-simulation-state input-queue)
       (.preventDefault %)))
  (.show ($ :#simulation_params)))					
					
(defn render-config []
  [[:transform-enable [:simulation] show-simulation-params]])
					

Rendering - fill form with data


(defn set-on-form [id values path]
  (.val ($ id) (get-in values path)))

(defn set-simulation-params [_ [_ _ _ new-value] _]
  (set-on-form :#received_mean new-value [:received :mean])
  (set-on-form :#received_sigma new-value [:received :sigma])
  (set-on-form :#received_sine_amplitude new-value [:received :sine-amplitude])
  
  (set-on-form :#gabby_mean new-value [:servers "gabby" :mean])
  (set-on-form :#gabby_sigma new-value [:servers "gabby" :sigma])
  
  (set-on-form :#nicky_mean new-value [:servers "nicky" :mean])
  (set-on-form :#nicky_sigma new-value [:servers "nicky" :sigma]))				
					
(defn render-config []					
  [[:value [:simulation] set-simulation-params]])
					

Rendering - push data from form to input queue


(defn get-from-form [state id path]
  (assoc-in state path (cljs.reader/read-string (.val ($ id)))))

(defn update-simulation-state [input-queue]
  (let [state
        (-> {}
          (get-from-form :#received_mean [:received :mean])
          (get-from-form :#received_sigma [:received :sigma])
          (get-from-form :#received_sine_amplitude [:received :sine-amplitude])
          
          (get-from-form :#gabby_mean [:servers "gabby" :mean])
          (get-from-form :#gabby_sigma [:servers "gabby" :sigma])
          
          (get-from-form :#nicky_mean [:servers "nicky" :mean])
          (get-from-form :#nicky_sigma [:servers "nicky" :sigma]))]
    (p/put-message input-queue {msg/type :set
                                msg/topic [:simulation]
                                :value state})))
					

Result

Summary

Behavior/Dataflow


(def monitor-app
  {:transform [[:set-value [:**] set-count]
               [:set [:**] #(:value %2)]
               [:start [:connected] connect]]
   :derive #{[#{[:received :count]} [:received :tps] derive-tps]
             [#{[:received :tps]} [:received :tps-history] derive-history :single-val]
             
             [#{[:server :count]} [:processed :count] sum-server-counts :single-val]
             [#{[:server :count]} [:server :tps] server-tps]
             [#{[:server :tps]} [:server :tps-history] derive-history :single-val]
             
             [#{[:processed :count]} [:processed :tps] derive-tps]
             [#{[:processed :tps]} [:processed :tps-history] derive-history :single-val]
             
             [{[:received :count] :received, [:processed :count] :processed} 
              [:backlog :count] derive-backlog :map]
             [#{[:backlog :count]} [:backlog :count-history] derive-history :single-val]}
   :emit [{:in #{[:received :tps-history] [:processed :tps-history]}
           :fn emit-tps-history}
          {:in #{[:backlog :count-history]} :fn emit-backlog-history}      
          {:in #{[:server :tps-history]} :fn emit-server-history}
          {:in #{[:connected]} :fn emit-connected}
          {:in #{[:simulation]} :fn emit-simulation :init init-simulation}]
   :effect #{[#{[:connected]} start-connection :single-val]
             [#{[:simulation]} update-simulation-params :single-val]}})					
					

Notes to Behavior/Dataflow

  • Very concise overview of data flow in the entire application. Fits on a slide!
  • Just data manipulation, no code related to services or rendering.
  • Only pure functions. No mutability or side effects anywhere in our code.
  • Very unit testing-friendly.
  • Great productivity thanks to separation of concerns and simplicity of each building block.

Rendering


(defn render-config []
  [[:node-create [] hide-simulation-params]
   
   [:node-create [:tps] (prepare-chart "#received_counts")]
   [:value [:tps] render-tps-chart]
   
   [:node-create [:backlog] (prepare-chart "#backlog")]
   [:value [:backlog] render-backlog-chart]
   
   [:node-create [:server-tps] (prepare-chart "#server_tps")]
   [:value [:server-tps] render-server-tps-chart]
   
   [:transform-enable [:simulation] show-simulation-params]
   [:value [:simulation] set-simulation-params]
   
   [:transform-enable [:connected] (h/add-send-on-click "connect_button")]
   [:transform-disable [:connected] hide-connect-button]])
					

Notes to Rendering

  • GUI separated from dataflow and services.
  • Just DOM manipulation and rendering, no code related to services or application state.
  • Mostly impure functions.

Services


(defn services-fn [message input-queue]
  (case (msg/topic message)
    [:connect] (connection-established message input-queue)
    [:simulation] (reset! simulation-params (:value message)))) 
					
  • Very simple handler. Invoked for every message from the service queue, has access to the input queue to update application state with server data.
  • Just the services. No GUI, little concern about application state.

Did Not Fit

Renderer Templates

Pedestal comes with an Enlive wrapper, enabling:
  • Composing the page of smaller "tiles"
  • Populating bindings in each "tile" with data (think AngularJS with manual push)

Focus

  • Support for multi-screen apps.
  • Control what changes are propagated to renderer.

(def app-dataflow
  {:focus {:login [[:login]]
           :game  [[:main] [:pedestal]]
           :default :login}})

; To set focus, send:
(def msg {msg/topic msg/app-model msg/type :set-focus :name :game})
					

Continue Functions

Like derive, but generate a batch of transform messages to be processed in a single transaction, allowing arbitrary recursion.

Web Workers

Pedestal provides a utility to execute the entire dataflow in Web Workers for better performance.

Pedestal Service - Server Side

  • Routing tables
  • Asynchronous processing
  • Server-sent events
  • Streaming responses

Framework or Library?

  • Small, easily composable or replaceable pieces.
  • Can be used as just a dataflow lib.
  • Rendering: Pedestal/Enlive, jQuery, Knockout, AngluarJS...
  • Services: Up to you