Distributed performance tracing of Storm

Kweo provides rich and real-time user engagement features to website owners.
Having users interact with each other on a website with comments and voting on polls, encourages people to spend more time on the site

For the Kweo platform we wanted to offer these features at near real time at large scale.

One example of this is receiving a comment from a user. The following steps are needed:

  1. Receive the comment from the user
  2. Check if it is spam or unwanted content
  3. Notify interested parties of the comment
  4. Update search indexes
  5. Save the comment to a data store
  6. Update any statistics

Some of these actions can be fairly complex, and the only way to achieve a good response time to the users or owner of the site is to perform many of these actions asynchronously and in parallel, potentially on many separate servers.

Storm works well with this type of event processing. This article isn’t a Storm tutorial (for that go here), but we use some storm terms in the following sections:

  • Stream: a continuous sequence of messages that is flowing through storm
  • Spout: A source of streams. Could read from a queue, or network socket
  • Bolt: a component that consumes messages from the stream, processes it and possibly emits a message for other bolts.
  • Topology: a linked collection of spouts and bolts submitted to the storm cluster.

 

Each bolt can have its dependencies and concurrency levels configured outside the logic of the bolt itself. Storm handles passing of messages between each bolt, handling the configured ordering and grouping of messages and selecting which node the bolt should process the message on.

 

 

Simplified Kweo comment topology

Simplified Kweo comment topology

 

Storm’s built-in performance metrics

Storm provides some high level aggregate metrics (see image below) but nothing that allows us to track the message as flows through a webservice,  onto a queue, into storm, Solr and Cassandra. Sampling 1% of messages on each bolt for performance metrics won’t answer functional questions like “did the comment with a certain spam score actually get saved?”. Averaging latencies from different components can’t show us that only messages from a single client are performing poorly and where the time is spent.

Sampling or aggregation of latencies at each tier isn’t enough to show engineers the real root cause of performance or functional issues. A full end-to-end view of the transaction is required (distributed performance tracing).

 

Storm UI

High-level aggregate metrics in the Storm UI doesn’t show actual performance issues

Tracing each Message instead of Sampling

 

To track a message as it moves through the cluster the following steps are required:

 

  1. Generate a unique global transaction trace ID for each transaction
  2. Pass the trace ID to each part of the distributed system
  3. Each system component needs to generate it’s own sub – trace ID for it’s part of the distributed trace
  4. Each application in the cluster will need to extract the parent trace ID and “join” the distributed trace
  5. A central server correlates each piece of the distributed transaction together from all the different applications and stores it ready for searches and visualizations

 

Passing the trace ID between applications depends on the protocol in use. Some protocols have metadata fields, like headers in HTTP, which makes passing the ID relatively simple. We made the decision to always include an optional trace Id into our Storm processing tasks.

This approach to distributed tracing can easily add timing information for each application or component within a cluster. A simple implementation would just save timing information and the trace ID along with any other relevant parameters to a database.

 

Distributed performance tracing within Storm

 

Storm chooses where each bolt executes based how the Storm job is configured, nodes in the cluster and the scheduler. The location of a bolt is running can change if a node fails or the topology concurrency settings are changed.

This means we need to join the distributed trace at the beginning of the Bolt execute method. Anything that the bolt emits will need to have the trace ID added and possibly end that piece of the trace.

We opted to put this logic in a base class for our bolts and a wrapper around the collector, which will ensure the trace ID is always passed along. This way our Storm bolts don’t have to explicitly have trace management code.

 

Implementation with Compuware APM/dynaTrace

 

As already covered in our previous engineering blog post,  the Kweo team is using dynaTrace – the Application Performance Solution from Compuware. dynaTrace has out of the box support for most applications, application servers and libraries. It is already tracing our Cassandra, Solr and Client-side browser code. However, Storm is still quite new so support for it is currently not provided out of the box. For these scenarios Compuware does supply the Agent Development Kit (ADK) for custom applications or protocol tagging.

The API is fairly simple and is mostly just marking where each part of a transaction (or PurePath) begins and ends.

Our webservice tier retrieves the current traceId or creates a new one, then passes the traceId via the message queue (Apache Kafka) to Storm. Each Bolt starts a new server sub path, performs its work and then ends the sub path. Whenever a message is emitted a new sub traceId is created and linked with the current PurePath to join them together.

This chains multiple sub-paths together until there are either no more bolts in the storm topology or control of the message is passed along to another application (e.g. back to the web tier) where the PurePath can continue.

Example Source Code

An example distributed trace

 

Here’s a sample trace of the comment posting example. A webservice is invoked from the browser to our webservice tier (Netty), which sends the message into Storm and Solr.

 

High Level Topology View

High Level Topology View

 

This view has assembled the traceIds from each individual component into one distributed trace, a PurePath.

We can already see that a majority of the time is spent on Cassandra and Solr.

 

ExpandedTopology

Expanded view showing how the transaction interacts with each node in the cluster

 

Expanding each tier of the application shows how a single or a group of messages interacted with the different nodes within the cluster. We can see both Cassandra nodes are being used roughly evenly so we don’t have any hotspots in the Cassandra cluster. There is one request sent to Solr and internally it’s gone from one Solr node to another. This is the result of data replication within Solr in a clustered configuration.

35% of the total distributed transaction time on a single Solr node is a little high and under load could cause messages being processed on storm to bank up

 

PurePath Details

PurePath Details

 

Drilling further into the Purepath tree we can see that Storm node test-02 has sent an Solr update request to solr-test-01. Solr then executes a FileDescriptor sync to flush the index to disk, which takes 40ms. Flushing the Solr index to disk for a single document update is probably not required so we should look at tuning our Solr commit settings.

 

 

Final thoughts

 

Having visibility over distributed systems has always been important. Newer technologies like Storm and Hadoop manage execution of your code across a very large number of servers without you having to worry about these details. However when tracking down issues over dozens of applications and hundreds of servers a single distributed trace of any message makes tracking costly down errors or performance issues a lot easier.

 

Kweo topology

Application Performance Management for distributed real-time data processing

In our first engineering blog post we discussed the Kweo core requirements and our high-level design decisions we made.

As one of the most important requirements is real-time user interaction, Application Performance Management at scale is encoded into our engineering DNA. It starts with architecture, and continues in solution design to the development ,test and production environment.

The Kweo distributed real-time data processing stack consists of the following major components:

 

The stack

  • Netty as event-driven network application framework
  • Apache Kafka as a fault-tolerant, high throughput distributed messaging system
  • Storm / Trident for distributed and fault-tolerant real-time computation
  • Apache Cassandra as a fault-tolerant, distributed column oriented database

 

The challenge

We needed an APM solution which works in our development, test and production environment.
The solution needed to be able to provide:

  • Infrastructure system monitoring and alerting
  • Application monitoring and alerting
  • Application Performance Management
  • Shows response time, CPU cost, API breakdown, suspensions (garbage collection)and IO time for each trace
  • Real-User monitoring
  • Historical metric storage up to 365 days
  • Runs in our environment and not as an external SaaS model
  • Can be used for deep distributed transaction tracing and as a distributed profiler
  • Always on, every transaction can be traced
  • Can trace through our stack above end-to-end
  • Works well in Amazon EC2
  • Supports tracing through Apache, PHP, Java, zeromq and custom protocols

 

The APM candidates

 

Selection Criteria

  CA Wily Introscope AppDynamicsNewRelicCompuware dynaTrace
JavaYESYESYESYES
Apache WebserverYESYESYESYES
Operating System AgentYESYESYESYES
PHPNOYESYESYES
Development kits for custom tagging and instrumentation through zeromq, custom protocolsNO, classes can be instrumented but cant trace through modern protocols, zeromq, Apache Kafka or StormYES but limited ( cant trace through zeromq and storm)NO ( Android and IOS coming soon)YES via ADK (JS,JAVA,.NET, Android and IOS and native C)
Distributed Tracing always onNONO, Sampling onlyNOYES
Distributed Profiler always onNONO, Sampling onlyNOYES
Real-User monitoringNO - Requires another product Customer Experience Manager (CEM)YES but limited for Web 2.0/HTML5 user experience, don't track entire visitsYES but limited for Web 2.0/HTML5 user experience, don't track entire visitsYES - supports Web 2.0/HTML5, tracks entire visits and has UEM complaint resolution capability
On-Premis SolutionYESYESNOYES
RT, CPU, API break down, suspensions(GC), IO time for each transactionNONONOYES

Compuware dynatrace is the APM vendor with best fit for the Kweo’s distributed data processing platform

We are able to trace through Netty -> Apache Kafka -> Storm/Trident -> zeromq -> Storm / Trident -> Cassandra 

 

Topology Views

dynaTrace auto-maps the topology for each transaction (PurePath) or for entire timeframes, whilst providing instant easy to read status overviews for each node and application (cluster).

 

Kweo topology

Compuware dynaTrace auto-mapped topology tracing through Netty, Aapche Kafka, Storm/Trident, zeromq and Apache Cassandra

 

 

 

System monitoring out of the box

These monitoring views come out of the box with zero configuration. The monitoring dashboard is built-in dynaTrace and requires no further configuration.

 

dynatrace host monitoring

dynatrace host monitoring out of the box

Application Performance Management and alerting with zero configuration

As before the application monitoring views come out of the box with zero configuration. The monitoring dashboard is built-in dynaTrace and requires no further configuration.

dynatrace application monitoring

dynatrace application monitoring out of the box

PurePaths – the distributed stack trace with detailed performance metrics

At Kweo we are operating a highly distributed system which operates asynchronously with many different protocols. It is important to understand what a single end-user click or service call does ( or doesn’t do)  in our system. Being able to trace back any distributed transaction is crucial to identifying issues quickly.

Purepath

dynaTrace Purepath

API distribution per transaction

Each PurePath can be looked at from the API distribution perspective. This is very valuable to see with one click if a specific API (including 3rd party APIs)  is responsible for excessive CPU usage or slow response times.

API-contribution

3 -clicks to root cause – problem analysis made easy

Another effective way to quickly identify issues is to switch to the contributors tab within a PurePath. This view flattens the distributed stack trace with it’s performance metrics and sorts it by highest contributors classes and methods. It also color-codes using deviations in execution time by default.

 

solr_slow_sync

One click one a Purepath and one click on Contributors tab identify the culprits (auto color-coded)

 

Some things are meant to be slow, but be ready to handle the consequences

Some transaction are not really problems , because they work as designed ( password encryption is deliberately high in CPU and time ), but when they pop up as in this image below, it is a reminder to implement effective measures that these expensive service calls don’t result in a massive system failure because of overuse.

dynaTrace API contribution

dynaTrace API contribution

 

Auto generated UML sequence diagrams for each transaction (Purepath)

This helps us to validate our solution design versus real-life implementation. Things like 4000 Cassandra calls (even if it takes only a few milliseconds) for a single transaction will be right in your face telling you clearly there is a flaw (in the design or implementation).

dynaTrace  Sequence Diagrams

dynaTrace auto generated UML Sequence Diagrams

 

In the following post we are explaining in details how we are tracing through Storm.

 

 

Engineering at Kweo.com

M-Square is the developer and operator of Kweo.com a highly scalable User Engagement Platform providing embeddable widgets to website operators and online media companies. Kweo is designed to handle millions of concurrent users and provides features like real-time comments with reddit-like ranking, polls/voting on massive scale to its subscribers.

As real-time is crucial, the architecture of Kweo is fundamental to our promise that our subscribers and their end users have the best user experience possible.

This is the first article of six in which we will discuss our architecture and design decisions made.

 

Core Requirements

 

  • All Kweo widgets must be able to run on Kweo subscribers websites without blocking site content or performance impact
  • Users don’t experience a noticeable lag, whenever users perform actions.
  • All actions should feel instantly executed
  • Works on major platforms including mobile platforms
  • Kweo must be able to store and process millions of events per second (that’s important for polling/voting during major global events)
  • Kweo must be cost effective to operate / maintain
  • Must be fault-tolerant, highly available (works across datacenter and geographic regions)
  • Ability to detect instantly abnormal deviations in the Kweo stack, auto-recovery and auto-scaling of infrastructure and applications

 

Kweo topology

 

Design Decisions

 

The core stack

 

 

DevOps

 

The future blog posts will describe in detail interesting topics about libraries, problems solved and challenges in our daily engineering work.

 

Read the following blog posts:

Application Performance Management for distributed real-time data processing