I’ve been working over the last year in the data team at SoundCloud building a realtime data pipeline using Clojure and Amazon’s Kinesis. Kinesis is Amazons equivalent to Kafka, “Real-Time data processing on the Cloud”. This is a summary of what was built, some lessons learnt and all the details in-between.
Tapping Real traffic
The first step was to tee the traffic from a live system to a test system without comprising its function. The main function of the live system is logging JSON events to file (which eventually end up somewhere like HDFS). Tailing the logs of the live system gives us access to the raw data we want to forward on to our test system. A little Go watches the logs, parses out the data and then forwards them in batch to test instances that will push to kinesis. Hence we had live data flowing through the system and after launch a test setup to experiment with. Sean Braithwaite was the mastermind behind this little bit of magic.
Sending to Kinesis
All kinesis sending happens in an application called the EventGateway (also written in Clojure). This endpoint is one of the heaviestly loaded services in SoundCloud (at points it has more traffic than the rest of SoundCloud combined). The Eventgateway does a couple of things but at its core it validates and broadcasts JSON messages. Hence this is where our Kinesis client slots in.
Squeezing Clojure Reflection
Its worth mentioning that in order for the Eventgateway service to be performant we had to remove all reflection in tight loops through type hints. It simply could not keep up without this. It became a common pattern to turn reflection warnings on while working in Clojure.
The Eventgateway posts to Kinesis in batch using a ConcurrentLinkedQueue and separate producers and consumers. Messages are pushed into a
ConcurrentLinkedQueue. We rolled our own Clojure kinesis client using Amazons Java library rather than using Amazonica.
1 2 3
Amazonica was good to get started quickly in the initial phase but there are a couple of reasons we switched to our own unique snowflake (which still looked a little like Amazonica):
- Amazonica did not support batch mode for Kinesis. Under initial tests it was impossible to scale this without batch.
- Injecting our own telemetry at low levels to learn more about Kinesis running.
- Some of its sensible defaults where not so sensible (for example default encoding the data using nippy).
- Ultimately most of any Kinesis client/server is configuration and tuning.
- Amazonica’s source is hard to read with a little too much
1 2 3 4 5 6
Pushing Messages in a Queue
Very simple, just adding a message to the
ConcurrentLinkedQueue. A environment variable allows us to gradually scale up or down the percentage of traffic that is added to the queue.
1 2 3 4 5 6 7 8 9 10 11 12
The queue pusher operates within a wider system and any failures due to Amazon being unreachable should not impede the function of the system. For the client this means:
- Not exceeding memory limits with a hard queue size (since ConcurrentLinkedQueue is unbound in size).
- Backing off workers if the queue is full to prevent cpu throttling.
When we cannot send messages to kinesis we instead log them to disk, and into our normal logging pipeline (usually ending up in HDFS). Hence we coule replay at a later date if required.
Sending batches to Kinesis
The workers, operating in separate threads consuming messages from the
ConcurrentLinkedQueue collecting them into a batch:
1 2 3 4 5 6 7 8 9 10 11 12
When polling from the queue an exponential back-off if no messages are on the queue.
1 2 3 4 5 6 7 8 9 10
Once the batch is ready (in terms of age or size) its sent to Kinesis.
1 2 3 4 5 6 7 8 9 10 11
Note this is where we also decided the partition key. In our case its important for the same user to be located on the same partition. For example when consuming from Kinesis a worker is allocated a partition to work from and would miss events if they where across multiple partitions.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
Failure can occur on individual records within a batch or in the batch as a whole.
- These messages are re-added to the queue so we can try again. If the messages fail for some nth time they are considered invalid and rejected from kinesis and logged as an error.
Amazon had an Internal Failure. We don’t know what went wrong. (We see this regularly in normal function).
Amazon Kinesis is not resolvable (AmazonClientException/AmazonServiceException).
Exceeding the read/write limits of Kinesis (ProvisionedThroughputExceededException).
This is our backpressure signal, in which case at worst we need to log to disk for replay later
Consuming Messages from Kinesis
With the consumption of events we have a different application stream for every worker. All workers have their own streams, and own checkpoints so they operate independently of each other. Some example of the workers we gave running:
- Logging Events to s3
- Calculating listening time
- Forwarding certain messages on to various other systems (like RabbitMQ).
Launching a worker is pretty simple with the Amazon Java Kinesis library.
1 2 3 4 5 6 7 8
One of the hardest parts of setting up the a worker is getting the configuration right to ensure that the consumers are getting through the events fast enough. Events are held in Amazon for 24 hours after entry, and hence there is a minimum consumption rate.
Counting events in and events out with Prometheus made it easier to get the correct consumption rates.
Via the Amazon console you also get access to various graphs around read/write rates and limits:
Finally you can also look at Amazon’s Dynamodb instance for the Kinesis stream providing insight into metrics around leases, how many where revoked, stolen, never finished, etc.
Here is an example of one of our Kinesis workers configuration covered in scribblings of me trying to work out the right settings.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
Prometheus (http://prometheus.io/) a monitoring tool built at SoundCloud was core to developing, scaling and monitoring all of this pipeline. Amazon does provide some useful graphs within the AWS console but more detailed feedback was very helpful even if it was removed later.
Exception Logging pattern
All Exceptions are counted and sent to log. This was a very useful pattern for driving out errors and spotting leaks in the interactions with Kinesis and consumption:
(Using a Clojure wrapper around Prometheus: https://github.com/josephwilk/prometheus-clj)
1 2 3 4 5 6
Note Kinesis regularly spits out “InternalFailure” Exceptions. Thats all you get…
A Cloud Pipeline in Pictures
In my previous post about Building Clojure services at scale I converted the system metrics to sound. With so many machines processing so many events its easy to loose track of the amount of work being done in the cloud. To make this feel more real I captured metrics across all the machines involved and created 3d renderings using OpenFrameworks and meshes of the systems function:
This work constitues a team effort by the Data team at SoundCloud. A lot of advice, collaboration and hard work. Kudos to everyone.