Posts

Distributed performance tracing of Storm

Kweo provides rich and real-time user engagement features to website owners.
Having users interact with each other on a website with comments and voting on polls, encourages people to spend more time on the site

For the Kweo platform we wanted to offer these features at near real time at large scale.

One example of this is receiving a comment from a user. The following steps are needed:

  1. Receive the comment from the user
  2. Check if it is spam or unwanted content
  3. Notify interested parties of the comment
  4. Update search indexes
  5. Save the comment to a data store
  6. Update any statistics

Some of these actions can be fairly complex, and the only way to achieve a good response time to the users or owner of the site is to perform many of these actions asynchronously and in parallel, potentially on many separate servers.

Storm works well with this type of event processing. This article isn’t a Storm tutorial (for that go here), but we use some storm terms in the following sections:

  • Stream: a continuous sequence of messages that is flowing through storm
  • Spout: A source of streams. Could read from a queue, or network socket
  • Bolt: a component that consumes messages from the stream, processes it and possibly emits a message for other bolts.
  • Topology: a linked collection of spouts and bolts submitted to the storm cluster.

 

Each bolt can have its dependencies and concurrency levels configured outside the logic of the bolt itself. Storm handles passing of messages between each bolt, handling the configured ordering and grouping of messages and selecting which node the bolt should process the message on.

 

 

Simplified Kweo comment topology

Simplified Kweo comment topology

 

Storm’s built-in performance metrics

Storm provides some high level aggregate metrics (see image below) but nothing that allows us to track the message as flows through a webservice,  onto a queue, into storm, Solr and Cassandra. Sampling 1% of messages on each bolt for performance metrics won’t answer functional questions like “did the comment with a certain spam score actually get saved?”. Averaging latencies from different components can’t show us that only messages from a single client are performing poorly and where the time is spent.

Sampling or aggregation of latencies at each tier isn’t enough to show engineers the real root cause of performance or functional issues. A full end-to-end view of the transaction is required (distributed performance tracing).

 

Storm UI

High-level aggregate metrics in the Storm UI doesn’t show actual performance issues

Tracing each Message instead of Sampling

 

To track a message as it moves through the cluster the following steps are required:

 

  1. Generate a unique global transaction trace ID for each transaction
  2. Pass the trace ID to each part of the distributed system
  3. Each system component needs to generate it’s own sub – trace ID for it’s part of the distributed trace
  4. Each application in the cluster will need to extract the parent trace ID and “join” the distributed trace
  5. A central server correlates each piece of the distributed transaction together from all the different applications and stores it ready for searches and visualizations

 

Passing the trace ID between applications depends on the protocol in use. Some protocols have metadata fields, like headers in HTTP, which makes passing the ID relatively simple. We made the decision to always include an optional trace Id into our Storm processing tasks.

This approach to distributed tracing can easily add timing information for each application or component within a cluster. A simple implementation would just save timing information and the trace ID along with any other relevant parameters to a database.

 

Distributed performance tracing within Storm

 

Storm chooses where each bolt executes based how the Storm job is configured, nodes in the cluster and the scheduler. The location of a bolt is running can change if a node fails or the topology concurrency settings are changed.

This means we need to join the distributed trace at the beginning of the Bolt execute method. Anything that the bolt emits will need to have the trace ID added and possibly end that piece of the trace.

We opted to put this logic in a base class for our bolts and a wrapper around the collector, which will ensure the trace ID is always passed along. This way our Storm bolts don’t have to explicitly have trace management code.

 

Implementation with Compuware APM/dynaTrace

 

As already covered in our previous engineering blog post,  the Kweo team is using dynaTrace – the Application Performance Solution from Compuware. dynaTrace has out of the box support for most applications, application servers and libraries. It is already tracing our Cassandra, Solr and Client-side browser code. However, Storm is still quite new so support for it is currently not provided out of the box. For these scenarios Compuware does supply the Agent Development Kit (ADK) for custom applications or protocol tagging.

The API is fairly simple and is mostly just marking where each part of a transaction (or PurePath) begins and ends.

Our webservice tier retrieves the current traceId or creates a new one, then passes the traceId via the message queue (Apache Kafka) to Storm. Each Bolt starts a new server sub path, performs its work and then ends the sub path. Whenever a message is emitted a new sub traceId is created and linked with the current PurePath to join them together.

This chains multiple sub-paths together until there are either no more bolts in the storm topology or control of the message is passed along to another application (e.g. back to the web tier) where the PurePath can continue.

Example Source Code

An example distributed trace

 

Here’s a sample trace of the comment posting example. A webservice is invoked from the browser to our webservice tier (Netty), which sends the message into Storm and Solr.

 

High Level Topology View

High Level Topology View

 

This view has assembled the traceIds from each individual component into one distributed trace, a PurePath.

We can already see that a majority of the time is spent on Cassandra and Solr.

 

ExpandedTopology

Expanded view showing how the transaction interacts with each node in the cluster

 

Expanding each tier of the application shows how a single or a group of messages interacted with the different nodes within the cluster. We can see both Cassandra nodes are being used roughly evenly so we don’t have any hotspots in the Cassandra cluster. There is one request sent to Solr and internally it’s gone from one Solr node to another. This is the result of data replication within Solr in a clustered configuration.

35% of the total distributed transaction time on a single Solr node is a little high and under load could cause messages being processed on storm to bank up

 

PurePath Details

PurePath Details

 

Drilling further into the Purepath tree we can see that Storm node test-02 has sent an Solr update request to solr-test-01. Solr then executes a FileDescriptor sync to flush the index to disk, which takes 40ms. Flushing the Solr index to disk for a single document update is probably not required so we should look at tuning our Solr commit settings.

 

 

Final thoughts

 

Having visibility over distributed systems has always been important. Newer technologies like Storm and Hadoop manage execution of your code across a very large number of servers without you having to worry about these details. However when tracking down issues over dozens of applications and hundreds of servers a single distributed trace of any message makes tracking costly down errors or performance issues a lot easier.