Heavy Hitters in Redis

When running a public-facing website like Eventbrite, there are a number of reasons to keep track of your most active IP Addresses, API Keys or User IDs. Unfortunately, if you have a site that sees significant traffic, the volume of data you need to store and process to generate these “Heavy Hitter” statistics can become daunting.

To illustrate some of the issues with trying to generate these counts, let’s simulate a naive counting algorithm with some Python like this:

import operator
import random
from collections import defaultdict

# generate some random user ids
A = 100
N = 1000
T = 20
data_points = [random.randint(0, A) for _ in xrange(N)]

actual_count = defaultdict(int)
for item in data_points:
    actual_count[item] += 1

print sorted(actual_count.iteritems(), key=operator.itemgetter(1), reverse=True)[:T]

Before we go much further, let’s define a few constants to talk about the performance of the approaches:

N is the length of the data stream
A is the number of distinct observed data points
T is the number of heavy hitters we’d like to fetch

As previously alluded to, there are a few obvious problems here that arise every time you wish to produce the heavy hitters list using this naive approach. Firstly, you have to use O(A) storage complexity to save the count information. And secondly, each time you wish to get the statistics O(T + (A log A)) time complexity to sort the full data stream  to get the top T actors. If you are running a website that sees (b|m)illions of items per day, these performance characteristics are unacceptable.

Streaming Algorithms to the Rescue

We started looking into some of the streaming data sampling algorithms out there and settled on the SpaceSaving algorithm. The approach that SpaceSaving takes is preferable because we can build pre computed and sorted heavy-hitters lists with a well understood and acceptable amount of error built in.

Let’s define one more constant:

K is the number of previously seen items the SpaceSaving algorithm retains

Setting an appropriate K is crucial to the SpaceSaving algorithm, and involves knowing the distribution of data you plan on sending through it. I’m going to leave the discussion of this topic to the original paper, which does it more justice than we could in a quick blog post.

Along with setting the proper K value for the SpaceSaving algorithm, the other key factors in implementing SpaceSaving is which data structure to back it with. Due to a need to have this scale out to many writers, and retain state when processes die, we chose to back it with a known technology in our stack, Redis. Upon examining the operations that SpaceSaving requires, we chose Redis’ sortedset for its log(K) performance on almost all critical operations. For our implementation, we get O(K) storage complexity and O(log K) time complexity.  In addition, fetching the top T items is O( T + log K) with the chosen Redis implementation.

Our first implementation of this was done purely in Python, which required some amount of overhead to talk to Redis from Python. We then re-wrote the algorithm in Lua, which runs server-side in Redis, and saw a 50% performance boost on our test data set.

Here’s the final demo application that generates a non-equiprobable dataset and runs it through the Redis/Lua backed SpaceSaving algorithm. At the end, we also display the top 20 heavy hitters and compare their estimated counts to the real observed counts:

import numpy
import operator
import redis
from collections import defaultdict


# 5k is pretty safe for our test, read the paper on SpaceSaving for more info on 
# how to tune K for your use case
K = 5000
T = 20
N = 10000
myset = "top_k_in_stream"

r = redis.StrictRedis()

#clear out the data in our set, just in case
r.zremrangebyrank(myset, 0, -1)

hitters_script= """
local myset = ARGV[1]
local mykey = ARGV[3]
local set_length = tonumber(ARGV[2])

if redis.call('ZRANK', myset, mykey) then
    redis.call('ZINCRBY', myset, 1.0, mykey)
elseif redis.call('ZCARD', myset) < set_length then
    redis.call('ZADD', myset, 1.0, mykey)
else
    local value = redis.call('ZRANGE', myset,0,0, 'withscores')
    redis.call('ZREM', myset, value[1])
    redis.call('ZADD', myset, value[2] + 1.0, mykey)
end"""

hitters = r.register_script(hitters_script)

# generate some normally distributed data set for testing
# in practice, this would be some stream of values that you want
# to track (like ip addresses or user ids)
some_stream_of_data = [abs(int(x * 100)) for x in numpy.random.normal(0, .1, N)]

actuals = defaultdict(int)
for item in some_stream_of_data:
    #keep the actual value for comparison
    actuals[str(item)] += 1

    #send it to redis
    hitters(args=[myset, K, item])

#grab the top 20 hitters in the set
top_hitters = r.zrevrange(myset,0 ,T , withscores=True)

#just iterate through and see how the heavy hitters compare with actual data
for (key, value) in top_hitters:
    difference = value - actuals[key]
    print "Value %s was %s greater than the actual value of %s" % (key, difference, actuals[key])

Where to go from here:

The algorithm discussed above simply keeps frequent items, yet has no concept of time or decay of the counts.  Our implementation of this includes a concept of time via a bucketing mechanism and allows for us to ask for top actors over specified timeframes.