Joseph Wilk

Joseph Wilk

Programming bits and bobs

Building Clojure Services at Scale

At SoundCloud I’ve been experimenting over the last year with how we build the services that power a number of heavily loaded areas across our site. All these services have been built in Clojure with bits of Java tacked on the sides. Here are some of my personal thoughts on how to build Clojure services:

Netflix or Twitter

choose your own adventure

At some-point when you require a sufficient level of scaling you turn to the open source work of Twitter with Finagle or Netflix with Hystrix/RxJava. Netflix libs are written in Java while Twitters are written in Scala. Both are easy to use from any JVM based language but the Finagle route will bring in an extra dependency on Scala. I’ve heard little from people using interop between Clojure & Scala and that extra Scala dependency makes me nervous. Further I like the simplicity of Netflix’s libs and they have been putting a lot of effort into pushing support for many JVM based languages.

Hence with Clojure, Netflix projects are my preference. (I should add we do use Finagle with Scala at SoundCloud as well).

Failure, Monitoring & Composition Complexity

In a service reliant on other services, databases, caches any other external dependencies its a guarantee at some-point some of those will fail. When working with critical services we want to gracefully provide a degraded service.

While we can think about degrading gracefully in the case of failure, ultimately we want to fix wants broken as soon as possible. An eye into the runtime system allows us to monitor exactly whats going on and take appropriate action.

Your service needs to call other services. Dependent on those service results you might need to call other services which in turn might require calls to other services. Composing service calls is hard to get right without a tangle of complex code.

Fault Tolerance

How should we build fault tolerance into our Clojure services?

Single thread pool

Consider you have this line within a service response:

1
{:body @(future (client/get "http://soundcloud.com/blah/wah")) :status 200}

Now http://soundcloud.com/blah/wah goes down and those client requests start getting blocked on the request. In Clojure all future calls acquire a thread from the same thread pool. In our example the service is blocked up, is pilling new requests onto the blocked pool and we are in trouble.

My first solution to this problem was to introduce circuit breakers (https://github.com/josephwilk/circuit-breaker). I also stop using @ to dereference futures and used deref http://clojuredocs.org/clojure_core/clojure.core/deref which supports defaults and timeouts.

1
2
3
4
5
6
7
8
9
10
11
(defncircuitbreaker :blah-http {:timeout 30 :threshold 2})

(def future-timeout 1000)
(def timeout-value nil)

(defn http-get [url]
  (with-circuit-breaker :blah-http {
    :connected (fn [] (client/get "http://soundcloud.com/blah/wah"))
    :tripped (fn [] nil)}))

{:body (http-get http://www.soundcloud.com/blah/wah) :status 200}

Problem solved, now even though the thread pool may become blocked we back off the following requests and avoid pilling more work onto the blocked thread pool.

This worked pretty well, but then we decided we would to try and go even further in gracefully degrading. Why don’t we serve from a cache on failure, slightly stale data is better than none.

1
2
3
4
(defn http-get [url]
  (with-circuit-breaker :blah-http {
    :connected (fn [] (client/get "http://soundcloud.com/blah/wah"))
    :tripped (fn [] (memcache/get client url))}))

Now consider (client/get "http://soundcloud.com/blah/wah") starts failing, the thread pool gets blocked up, the circuit trigger flips and (memcache/get client url) is now fighting to get threads from the blocked thread pool.

Pants.

Scheduling over thread pools: Hystrix

Hystrix is Netflix library which I think of as circuit breakers on steroids.

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, 
services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems 
where failure is inevitable.

Dave Ray (http://darevay.com) has been doing lots of excellent work on producing Clojure bindings for Hystrix:

Hystrix gives me 2 big wins:

1. Separation of thread pools

Hystrix creates a separate thread pool for each Clojure namespace, if one thread pool becomes blocked due to a failure, a circuit breaker can be triggered with a fallback that uses a different thread pool.

This however does come with a cost:

  1. We have a performance hit due to moving to a scheduling based method for executing Hystrix commands.
  2. We cannot use Clojure’s concurrency primitives (futures/promises/agents).

Here is an example of our service rewritten with Hystrix:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(ns example
  (:require [[com.netflix.hystrix.core :as hystrix]]))

(hystrix/defcommand http-get
  {:hystrix/fallback-fn (fn [url] (memcache-get url)}
  [url]
  (client/get url))

(hystrix/defcommand memcache-get
  {:hystrix/fallback-fn (constantly nil)}
  [url]
  (memcache/get client key))

(defn http-get [url]
   {:body (http/get "http://soundcloud.com/blah/wah") :status 200})

Beautiful, Just adding the defcommand brings us fault tolerance with no other changes to the shape of our code.

See the Hystrix Clojure adapter for all the possible configuration: https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-clj

2. Monitoring

Hystrix supports exposing metrics on all circuit breakers within a process. It exposes these calls through an event stream.

How you expose that Hystrix event stream depends slightly on which web server you are using with your Clojure app.

Netty and Hystrix Event Streams (without servlets)

https://github.com/josephwilk/hystrix-event-stream-clj

1
2
3
(:require [hystrix-event-stream-clj.core as hystrix-event])

(defroutes app (GET "/hystrix.stream" (hystrix-event/stream))

Jetty and Hystrix Event Streams (with servlets)

If they are using Jetty you will need to change your app to add your main web app as a servlet. Then we can also add the Hystrix event stream servlet.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
(ns sc-clj-kit.hystrix.jetty
  (:import [com.netflix.hystrix.contrib.metrics.eventstream HystrixMetricsStreamServlet])
  (:import [org.eclipse.jetty.server Server])
  (:import [org.eclipse.jetty.servlet ServletContextHandler])
  (:import [org.eclipse.jetty.servlet ServletHolder])
  (:import [org.eclipse.jetty.server.bio SocketConnector])
  (:import [org.eclipse.jetty.server.ssl SslSocketConnector])

  (:import (org.eclipse.jetty.server Server Request)
           (org.eclipse.jetty.server.handler AbstractHandler)
           (org.eclipse.jetty.server.nio SelectChannelConnector)
           (org.eclipse.jetty.server.ssl SslSelectChannelConnector)
           (org.eclipse.jetty.util.thread QueuedThreadPool)
           (org.eclipse.jetty.util.ssl SslContextFactory)
           (javax.servlet.http HttpServletRequest HttpServletResponse))
  (:require
   [compojure.core          :refer :all]
   [ring.adapter.jetty      :as jetty]
   [ring.util.servlet :as servlet]))

(defn run-jetty-with-hystrix-stream [app options]
  (let [^Server server (#'jetty/create-server (dissoc options :configurator))
        ^QueuedThreadPool pool (QueuedThreadPool. ^Integer (options :max-threads 50))]
    (when (:daemon? options false) (.setDaemon pool true))
    (doto server (.setThreadPool pool))
    (when-let [configurator (:configurator options)]
      (configurator server))
    (let [hystrix-holder  (ServletHolder. HystrixMetricsStreamServlet)
          app-holder (ServletHolder. (servlet/servlet app))
          context (ServletContextHandler. server "/" ServletContextHandler/SESSIONS)]
      (.addServlet context hystrix-holder "/hystrix.stream")
      (.addServlet context app-holder "/"))
    (.start server)
    (when (:join? options true) (.join server))
    server))

(defroutes app (GET "/hello" [] {:status 200 :body "Hello"})

(run-jetty-with-hystrix app {:port http-port :join? false})

Aggregation and discovery

While you can monitor a single process using Hystrix in our example we have many processes serving an endpoint. To aggregate all these Hystrix metric services we use Turbine.

Physical endpoints for a service at SoundCloud are discovered using DNS lookup. We configured Turbine to use this method to auto discover which machines are serving an endpoint.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(ns sc-turbine.discovery
  (:import [org.xbill.DNS Type]
           [com.netflix.turbine.discovery InstanceDiscovery Instance])
  (:require [clj-dns.core :refer :all]))

(gen-class
   :name ScInstanceDiscovery
   :implements [com.netflix.turbine.discovery.InstanceDiscovery])

(defn -getInstanceList [this]
  (map (fn [instance]
         (Instance. (str (:host instance) ":" (:port instance)) "example-prod" true))
       (map (fn [answer] {:host (-> answer .getTarget str) :port (.getPort answer)})
            (:answers (dns-lookup "" Type/SRV)))))

And the config.properties:

1
2
3
InstanceDiscovery.impl=ScInstanceDiscovery
turbine.aggregator.clusterConfig=example-prod
turbine.instanceUrlSuffix=/hystrix.stream

Putting this all together our monitoring looks like this:

Pretty graphs: Hystrix Dashboard

Finally we run the Hystrix Dashboard and watch our circuit breakers live.

And heres an example with triggered circuit breakers:

Since I cannot show you the dashboard running, you will have to make do with music generated from the metrics. I normalize the live Hystrix metrics to piano pitches and play the notes as the arrive from the stream.

Hystrix Metrics as Sounds

Complexity & Composition

Working with many services, composition of service calls becomes hard to think and write about. Callbacks try to solve this but nested callbacks leave us with a mess.

RxJava tries to solve this using the Reactive Functional model. While RxJava provides lots of features I see it primarily as a way of expressing concurrent actions as a directed graph which provides a single callback on success or failure. The graph is expressed in terms or maps/reduces/filters/etc, things we are familiar with in the functional world.

To separate the request thread from the response thread we use RxJava with Netty and Aleph.

Here is a very simple example firing 2 concurrent requests and then joining the results into a single map response:

1
2
3
4
5
6
7
8
9
;;Hystrix integrates with RxJava and can return Observables for use with Rx.
(defn- find-user-observable [id] (hystrix/observe #'find-user id))

(defn- meta-data [user-urn]
  (let [user-observable (-> (find-user-observable id) (.map (λ [user] ...)))
        meta-observable (-> (find-user-meta-observable id) (.map (λ [subscription] ...))))
    (-> (Observable/zip user-observable
                        meta-observable
                        (λ [& maps] {:user (apply merge maps)}))))

The function meta-data returns an Observerable which we subscribe to and using Aleph return the resulting JSON to a channel.

1
2
3
4
5
6
7
8
9
(defn- subscribe-request [channel request]
  (let [id (get-in request [:route-params :id])]
    (-> (meta-data id)
        (.subscribe
          #(enqueue channel {:status 200 :body %}))
          #(logging/exception %))))))

(defroutes app
  (GET "/users/:id" [id] (wrap-aleph-handler subscribe-request)))

The shape of the RxJava Clojure bindings are still under development.

That single thread pool again…

With RxJava we are also in a situation were we cannot use Clojure’s future. In order for RxJava to block optimally we don’t want to use a single thread pool. Hence we use Hystrix as our means of providing concurreny.

Clojure services at scale

I’m very happy with the shape of these services running at SoundCloud. In production they are performing very well under heavy load with useful near realtime monitoring. In part thanks to Netflix’s hard work there is no reason you cannot write elegant Clojure services at scale.

Comments