Replayable Pub/Sub Queues with Cassandra and ZooKeeper

When first playing around with Cassandra and discovering how fast it is at giving you columns for a row, it appears to be an excellent choice for implementing a distributed queue. However, in reality queues tend to bring out the worst of Cassandra’s thorniest areas: tombstones and consistency level, and are thus seen as an antipattern.

Row-Based vs Column-Based

To implement a queue in Cassandra, you must choose from either row-based or column-based.  In row-based, the item to be processed is stored as a row key. In column-based, the item to be processed is stored as a column in a specific row.

With the item to be processed stored as a row key, consistency becomes a bottleneck. Since the items to process are unknown, getting range slices across row keys is the only way to fetch data; this operation ends up querying every node when all keys are needed, as the location and number of keys are unknown ahead of time. Since not all nodes are available at any given time, this is less than ideal.

Continue reading “Replayable Pub/Sub Queues with Cassandra and ZooKeeper”

Smarter Unit Testing with nose-knows

No one likes to break unit tests. You get all stressed about it, feel like you’ve let your peers down, and sometimes even have to get everyone donuts the next day. Our production Python codebase is complex, and the smallest changes can have an unexpectedly large impact; this is only complicated by the fact that Python is a dynamic language, making it hard to figure out what code touches what.

Enter nose-knows, a plugin for the nose unit test runner (and py.test, experimentally). It traces your code while unit tests are running, and figures out which files have been touched by which tests. Now, running your full test suite with code tracing turned on is expensive, so we have a daily Jenkins job that does it and creates an output file. It can also do the converse, as it knows how to leverage this file to run specific tests.

Continue reading “Smarter Unit Testing with nose-knows”

Watching Metadata Changes in a Distributed Application Using ZooKeeper

We created a distributed ETL system we affectionately call Mandoline. It is configurable, distributed, scalable, and easy to manage – here’s how we did it.

One of the hardest parts of building a distributed system is ensuring proper coordination between nodes across a cluster, and we decided to do it using Apache ZooKeeper. ZooKeeper can be imagined as a remote file system, where every file is also a folder (these are referred to as “znodes”). For example, let’s say we have the znode /mandoline where we store the system version, "1". Under /mandoline we may also store items like the configuration, so /mandoline/load_configstores our load configuration (in our case, a json dictionary for every source). The magic sauce of ZooKeeper is that it guarantees “strictly ordered access” to znodes, allowing synchronization and coordination to be built on top of it.

Mandoline coordinates its tasks using watchers, a neat ZooKeeper feature. You can specify a znode, and whether you’d like to watch changes to its data or its children, and ZooKeeper will notify you once those have happened. In the Mandoline architecture, processes choose specific znodes and use those to communicate changes in the data they are responsible for.

For example, we have a component that loads orders from our Orders table in the database, and we have two other components that need to track: 1. the purchase history of a given user, and 2. the total sales for that event. This is how the loading data component does it:

latest_timestamp = 0
for datum in query_data:
    key = datum.pop(primary_key)

    timestamp = datum.pop(MANDOLINE_TIME_CHECKPOINT, 0)
    if timestamp > latest_timestamp:
        latest_timestamp = timestamp

    main_batch.insert(key, datum)

self.zk_client.retry(
    self.zk_client.set,
    self.load_notification_node,
    str(latest_timestamp),
)

Notice that there are many operations done for a given query, however only a small value (a timestamp, in this case) is written to ZooKeeper. Znodes have a restriction whereas they cannot hold large values, so the queue containing items to actually perform work on are stored in Cassandra while ZooKeeper handles the notification part.

Continue reading “Watching Metadata Changes in a Distributed Application Using ZooKeeper”